diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 47ef541..4db7e0a 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -2,7 +2,6 @@ package duneapi import ( "bytes" - "context" "encoding/json" "fmt" "log/slog" @@ -19,18 +18,9 @@ const ( ) type BlockchainIngester interface { - // Sync pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop - // it will block until: - // - the context is cancelled - // - channel is closed - // - a fatal error occurs - Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error - // SendBlock sends a block to DuneAPI SendBlock(payload models.RPCBlock) error - // TODO: - // - Batching multiple blocks in a single request // - API to discover the latest block number ingested // this can also provide "next block ranges" to push to DuneAPI // - log/metrics on catching up/falling behind, distance from tip of chain @@ -69,25 +59,8 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line }, nil } -func (c *client) Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case payload, ok := <-blocksCh: - if !ok { - return nil // channel closed - } - if err := c.SendBlock(payload); err != nil { - // TODO: implement DeadLetterQueue - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - c.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) - } - } - } -} - // SendBlock sends a block to DuneAPI +// TODO: support batching multiple blocks in a single request func (c *client) SendBlock(payload models.RPCBlock) error { start := time.Now() buffer := c.bufPool.Get().(*bytes.Buffer) diff --git a/client/jsonrpc/client.go b/client/jsonrpc/client.go index 5ba249e..c3ecd56 100644 --- a/client/jsonrpc/client.go +++ b/client/jsonrpc/client.go @@ -17,12 +17,6 @@ import ( type BlockchainClient interface { LatestBlockNumber() (int64, error) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) - - // SendBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. - // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain - // it will run continuously until the context is cancelled - SendBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error - Close() error } @@ -37,13 +31,13 @@ type rpcClient struct { bufPool *sync.Pool } -func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-line:unexported-return +func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:disable-line:unexported-return client := retryablehttp.NewClient() client.RetryMax = MaxRetries client.Logger = log client.CheckRetry = retryablehttp.DefaultRetryPolicy client.Backoff = retryablehttp.LinearJitterBackoff - return &rpcClient{ + rpc := &rpcClient{ client: client, cfg: cfg, log: log, @@ -53,6 +47,12 @@ func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-l }, }, } + // lets validate RPC node is up & reachable + _, err := rpc.LatestBlockNumber() + if err != nil { + return nil, fmt.Errorf("failed to connect to jsonrpc: %w", err) + } + return rpc, nil } func (c *rpcClient) LatestBlockNumber() (int64, error) { diff --git a/client/jsonrpc/opstack.go b/client/jsonrpc/opstack.go index 9e9087d..1dfea39 100644 --- a/client/jsonrpc/opstack.go +++ b/client/jsonrpc/opstack.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "log/slog" + "time" "github.com/duneanalytics/blockchain-ingester/models" "golang.org/x/sync/errgroup" @@ -16,9 +17,12 @@ type OpStackClient struct { var _ BlockchainClient = &OpStackClient{} -func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient { - rpcClient := NewRPCClient(cfg, log) - return &OpStackClient{*rpcClient} +func NewOpStackClient(log *slog.Logger, cfg Config) (*OpStackClient, error) { + rpcClient, err := NewClient(log, cfg) + if err != nil { + return nil, err + } + return &OpStackClient{*rpcClient}, nil } // BlockByNumber returns the block with the given blockNumber. @@ -31,6 +35,13 @@ func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient { // // we should handle the case where it is not available func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) { + tStart := time.Now() + defer func() { + c.log.Info("BlockByNumber", + "blockNumber", blockNumber, + "duration", time.Since(tStart), + ) + }() blockNumberHex := fmt.Sprintf("0x%x", blockNumber) // TODO: split this into mandatory and optional methods @@ -79,25 +90,3 @@ func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (m Payload: buffer.Bytes(), }, nil } - -func (c *OpStackClient) SendBlocks( - ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, -) error { - dontStop := endBlockNumber <= startBlockNumber - for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ { - block, err := c.BlockByNumber(ctx, blockNumber) - if err != nil { - c.log.Error("Failed to get block by number", - "blockNumber", blockNumber, - "error", err, - ) - return err - } - select { - case <-ctx.Done(): - return nil - case outChan <- block: - } - } - return nil -} diff --git a/cmd/main.go b/cmd/main.go index f522c06..f873a3c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -14,7 +14,10 @@ import ( "time" "github.com/duneanalytics/blockchain-ingester/client/duneapi" + "github.com/duneanalytics/blockchain-ingester/client/jsonrpc" "github.com/duneanalytics/blockchain-ingester/config" + "github.com/duneanalytics/blockchain-ingester/ingester" + "github.com/duneanalytics/blockchain-ingester/models" ) func init() { @@ -42,21 +45,34 @@ func main() { defer duneClient.Close() var wg stdsync.WaitGroup + var rpcClient jsonrpc.BlockchainClient - // rpcClient, err := jsonrpc.NewClient(&rpc.Config{ - // NodeURL: cfg.BlockchainNodeURL, - // PoolInterval: cfg.PoolInterval, - // Stack: cfg.BlockchainStack, - // }) + switch cfg.RPCStack { + case models.OpStack: + rpcClient, err = jsonrpc.NewOpStackClient(logger, jsonrpc.Config{ + URL: cfg.RPCNode.NodeURL, + }) + default: + stdlog.Fatalf("unsupported RPC stack: %s", cfg.RPCStack) + } + if err != nil { + stdlog.Fatal(err) + } + + ingester := ingester.New( + logger, + rpcClient, + duneClient, + ingester.Config{ + PollInterval: cfg.PollInterval, + StartBlockHeight: cfg.BlockHeight, + }, + ) - // harvester, err := harvester.New(&harvester.Config{ - // Logger: logger, - // DuneClient: duneClient, - // RPCClient: rpcClient, - // }) + wg.Add(1) + ingester.Run(context.Background(), &wg) - // wg.Add(1) - // harvester.Run(ctx, &wg) + // TODO: add a metrics exporter or healthcheck http endpoint ? _, cancelFn := context.WithCancel(context.Background()) quit := make(chan os.Signal, 1) diff --git a/config/config.go b/config/config.go index 0da5894..2993adb 100644 --- a/config/config.go +++ b/config/config.go @@ -21,8 +21,7 @@ func (d DuneClient) HasError() error { } type RPCClient struct { - NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"` - PoolInterval time.Duration `long:"rpc-pool-interval" env:"RPC_POOL_INTERVAL" description:"Interval to pool the blockchain node" default:"500millis"` // nolint:lll + NodeURL string `long:"rpc-node-url" env:"RPC_NODE_URL" description:"URL for the blockchain node"` } func (r RPCClient) HasError() error { @@ -33,12 +32,12 @@ func (r RPCClient) HasError() error { } type Config struct { + BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll + BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll Dune DuneClient + PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"500millis"` // nolint:lll RPCNode RPCClient - BlockchainName string `long:"blockchain-name" env:"BLOCKCHAIN_NAME" description:"name of the blockchain" required:"true"` // nolint:lll - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - BatchSize int `long:"batch-size" env:"BATCH_SIZE" description:"number of blocks to submit to Dune" default:"3"` // nolint:lll - BlockHeight int64 `long:"block-height" env:"BLOCK_HEIGHT" description:"block height to start from" default:"-1"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go new file mode 100644 index 0000000..97d807f --- /dev/null +++ b/ingester/ingester.go @@ -0,0 +1,82 @@ +package ingester + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/duneanalytics/blockchain-ingester/client/duneapi" + "github.com/duneanalytics/blockchain-ingester/client/jsonrpc" + "github.com/duneanalytics/blockchain-ingester/models" +) + +type Ingester interface { + Run(ctx context.Context, wg *sync.WaitGroup) error + + // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. + // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain + // it will run continuously until the context is cancelled + ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error + + // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop + // it will block until: + // - the context is cancelled + // - channel is closed + // - a fatal error occurs + SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error + + // This is just a placeholder for now + Info() Info +} + +const defaultMaxBatchSize = 1 + +type Config struct { + MaxBatchSize int + StartBlockHeight int64 + PollInterval time.Duration +} + +type Info struct { + LatestBlockNumber int64 + IngestedBlockNumber int64 + ConsumedBlockNumber int64 + RPCErrors []ErrorInfo + DuneErrors []ErrorInfo +} + +type ErrorInfo struct { + Timestamp time.Time + BlockNumber int64 + Error error +} + +type ingester struct { + log *slog.Logger + node jsonrpc.BlockchainClient + dune duneapi.BlockchainIngester + cfg Config + info Info +} + +func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester { + ing := &ingester{ + log: log, + node: node, + dune: dune, + cfg: cfg, + info: Info{ + RPCErrors: []ErrorInfo{}, + DuneErrors: []ErrorInfo{}, + }, + } + if ing.cfg.MaxBatchSize == 0 { + ing.cfg.MaxBatchSize = defaultMaxBatchSize + } + return ing +} + +func (i *ingester) Info() Info { + return Info{} +} diff --git a/ingester/mainloop.go b/ingester/mainloop.go new file mode 100644 index 0000000..5d9b1e5 --- /dev/null +++ b/ingester/mainloop.go @@ -0,0 +1,188 @@ +package ingester + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/duneanalytics/blockchain-ingester/models" + "github.com/go-errors/errors" + "golang.org/x/sync/errgroup" +) + +func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error { + defer wg.Done() + + inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) + defer close(inFlightChan) + + var err error + + startBlockNumber := i.cfg.StartBlockHeight + if startBlockNumber <= 0 { + startBlockNumber, err = i.node.LatestBlockNumber() + if err != nil { + return errors.Errorf("failed to get latest block number: %w", err) + } + } + i.log.Info("Starting ingester", + "maxBatchSize", i.cfg.MaxBatchSize, + "startBlockHeight", i.cfg.StartBlockHeight, + "startBlockNumber", startBlockNumber, + ) + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.Go(func() error { + return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, -1) + }) + errGroup.Go(func() error { + return i.SendBlocks(ctx, inFlightChan) + }) + errGroup.Go(func() error { + return i.ReportProgress(ctx) + }) + + return errGroup.Wait() +} + +// ConsumeBlocks from the NPC Node +func (i *ingester) ConsumeBlocks( + ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, +) error { + dontStop := endBlockNumber <= startBlockNumber + latestBlockNumber := i.tryUpdateLatestBlockNumber() + + waitForBlock := func(blockNumber, latestBlockNumber int64) int64 { + for blockNumber > latestBlockNumber { + i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval), + "blockNumber", blockNumber, + "latestBlockNumber", latestBlockNumber, + ) + time.Sleep(i.cfg.PollInterval) + latestBlockNumber = i.tryUpdateLatestBlockNumber() + } + return latestBlockNumber + } + + for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ { + + latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber) + startTime := time.Now() + + block, err := i.node.BlockByNumber(ctx, blockNumber) + if err != nil { + i.log.Error("Failed to get block by number, continuing..", + "blockNumber", blockNumber, + "latestBlockNumber", latestBlockNumber, + "error", err, + ) + i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: blockNumber, + Error: err, + }) + // TODO: should I sleep (backoff) here? + continue + } + + atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) + getBlockElapsed := time.Since(startTime) + + select { + case <-ctx.Done(): + return nil + case outChan <- block: + } + + distanceFromLatest := latestBlockNumber - block.BlockNumber + if distanceFromLatest > 0 { + // TODO: improve logs of processing speed and catchup estimated ETA + i.log.Info("We're behind, trying to catch up..", + "blockNumber", block.BlockNumber, + "latestBlockNumber", latestBlockNumber, + "distanceFromLatest", distanceFromLatest, + "getBlockElapsedMillis", getBlockElapsed.Milliseconds(), + "elapsedMillis", time.Since(startTime).Milliseconds(), + ) + } + } + return nil +} + +func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case payload, ok := <-blocksCh: + // TODO: we should batch RCP blocks here before sending to Dune. + if !ok { + return nil // channel closed + } + if err := i.dune.SendBlock(payload); err != nil { + // TODO: implement DeadLetterQueue + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: payload.BlockNumber, + Error: err, + }) + } else { + atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber) + } + } + } +} + +func (i *ingester) tryUpdateLatestBlockNumber() int64 { + latest, err := i.node.LatestBlockNumber() + if err != nil { + i.log.Error("Failed to get latest block number, continuing..", "error", err) + return atomic.LoadInt64(&i.info.LatestBlockNumber) + } + atomic.StoreInt64(&i.info.LatestBlockNumber, latest) + return latest +} + +func (i *ingester) ReportProgress(ctx context.Context) error { + timer := time.NewTicker(20 * time.Second) + defer timer.Stop() + + previousTime := time.Now() + previousDistance := int64(0) + previousIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber) + + for { + select { + case <-ctx.Done(): + return nil + case tNow := <-timer.C: + latest := atomic.LoadInt64(&i.info.LatestBlockNumber) + lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber) + lastConsumed := atomic.LoadInt64(&i.info.ConsumedBlockNumber) + + blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds() + newDistance := latest - lastIngested + fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable + + i.log.Info("Info", + "latestBlockNumber", latest, + "ingestedBlockNumber", lastIngested, + "consumedBlockNumber", lastConsumed, + "distanceFromLatest", latest-lastIngested, + "FallingBehind", fallingBehind, + "blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec), + ) + previousIngested = lastIngested + previousDistance = newDistance + previousTime = tNow + } + } +} + +func (i *ingester) Close() error { + return i.node.Close() +} diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go new file mode 100644 index 0000000..a96d103 --- /dev/null +++ b/ingester/mainloop_test.go @@ -0,0 +1,15 @@ +package ingester_test + +import "testing" + +func TestBlockConsumptionLoop(t *testing.T) { + t.Skip("not implemented") +} + +func TestBlockSendingLoop(t *testing.T) { + t.Skip("not implemented") +} + +func TestRunLoop(t *testing.T) { + t.Skip("not implemented") +}