diff --git a/README.md b/README.md index 6047ba4..dd69531 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,6 @@ You can use our public docker container image and run it as such: ```bash docker run -e BLOCKCHAIN_NAME='foo' -e RPC_NODE_URL='http://localhost:8545' -e DUNE_API_KEY='your-key-here' duneanalytics/node-indexer - ``` @@ -36,7 +35,6 @@ Build the binary for your OS: $ make build $ BLOCKCHAIN_NAME='foo' RPC_NODE_URL='http://localhost:8545' DUNE_API_KEY='your-key-here' ./indexer - ``` ## Configuration Options @@ -44,7 +42,4 @@ $ BLOCKCHAIN_NAME='foo' RPC_NODE_URL='http://localhost:8545' DUNE_API_KEY='your- You can see all the configuration options by using the `--help` argument: ```bash docker run duneanalytics/node-indexer ./indexer --help - ``` - - diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 035e633..18ca280 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -59,7 +59,11 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) { yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err) if yes { - log.Warn("Retrying request", "statusCode", resp.Status, "error", err) + if resp == nil { + log.Warn("Retrying request", "error", err) + } else { + log.Warn("Retrying request", "statusCode", resp.Status, "error", err) + } } return yes, err2 } @@ -191,11 +195,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques return err } - err = json.NewDecoder(resp.Body).Decode(&response) - if err != nil { - return err - } - return nil } @@ -220,7 +219,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch defer func() { if err != nil { c.log.Error("Sending progress report failed", - "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "lastIngestedBlockNumber", request.LastIngestedBlockNumber, "error", err, "statusCode", responseStatus, "duration", time.Since(start), @@ -228,7 +227,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch ) } else { c.log.Info("Sent progress report", - "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "lastIngestedBlockNumber", request.LastIngestedBlockNumber, "latestBlockNumber", request.LatestBlockNumber, "duration", time.Since(start), ) diff --git a/cmd/main.go b/cmd/main.go index 6a580cd..72070b3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -65,8 +65,9 @@ func main() { // Get stored progress unless config indicates we should start from 0 var startBlockNumber int64 // Default to -1 to start where the ingester left off + var progress *models.BlockchainIndexProgress if cfg.BlockHeight == -1 { - progress, err := duneClient.GetProgressReport(ctx) + progress, err = duneClient.GetProgressReport(ctx) if err != nil { stdlog.Fatal(err) } else { @@ -82,12 +83,15 @@ func main() { rpcClient, duneClient, ingester.Config{ - MaxBatchSize: cfg.Concurrency, + MaxConcurrentRequests: cfg.RPCConcurrency, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, Stack: cfg.RPCStack, BlockchainName: cfg.BlockchainName, + BlockSubmitInterval: cfg.BlockSubmitInterval, + SkipFailedBlocks: cfg.SkipFailedBlocks, }, + progress, ) wg.Add(1) diff --git a/config/config.go b/config/config.go index b0f71ce..ada04c5 100644 --- a/config/config.go +++ b/config/config.go @@ -39,8 +39,10 @@ type Config struct { PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient - RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll - Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll + RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll + BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"1s"` // nolint:lll + SkipFailedBlocks bool `long:"skip-failed-blocks" env:"SKIP_FAILED_BLOCKS" description:"Skip failed blocks when submitting to Dune"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index fbf3061..e9eeea0 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -44,11 +44,13 @@ const ( ) type Config struct { - MaxBatchSize int + MaxConcurrentRequests int PollInterval time.Duration ReportProgressInterval time.Duration Stack models.EVMStack BlockchainName string + BlockSubmitInterval time.Duration + SkipFailedBlocks bool } type Info struct { @@ -60,9 +62,9 @@ type Info struct { } type ErrorInfo struct { - Timestamp time.Time - BlockNumber int64 - Error error + Timestamp time.Time + BlockNumbers string + Error error } type ingester struct { @@ -73,16 +75,27 @@ type ingester struct { info Info } -func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester { +func New( + log *slog.Logger, + node jsonrpc.BlockchainClient, + dune duneapi.BlockchainIngester, + cfg Config, + progress *models.BlockchainIndexProgress, +) Ingester { + info := Info{ + RPCErrors: []ErrorInfo{}, + DuneErrors: []ErrorInfo{}, + } + if progress != nil { + info.LatestBlockNumber = progress.LatestBlockNumber + info.IngestedBlockNumber = progress.LastIngestedBlockNumber + } ing := &ingester{ log: log.With("module", "ingester"), node: node, dune: dune, cfg: cfg, - info: Info{ - RPCErrors: []ErrorInfo{}, - DuneErrors: []ErrorInfo{}, - }, + info: info, } if ing.cfg.PollInterval == 0 { ing.cfg.PollInterval = defaultPollInterval diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 4f892dd..4f09dab 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -3,6 +3,7 @@ package ingester import ( "context" "fmt" + "strings" "sync/atomic" "time" @@ -21,15 +22,22 @@ import ( // but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { ctx, cancel := context.WithCancel(ctx) + defer cancel() errGroup, ctx := errgroup.WithContext(ctx) blockNumbers := make(chan int64) defer close(blockNumbers) - blocks := make(chan models.RPCBlock) + + // We buffer the block channel so that RPC requests can be made concurrently with sending blocks to Dune. + // We limit the buffer size to the same number of concurrent requests, so we exert some backpressure. + blocks := make(chan models.RPCBlock, i.cfg.MaxConcurrentRequests) defer close(blocks) // Start MaxBatchSize goroutines to consume blocks concurrently - for range i.cfg.MaxBatchSize { + if i.cfg.MaxConcurrentRequests <= 0 { + return errors.Errorf("MaxConcurrentRequests must be > 0") + } + for range i.cfg.MaxConcurrentRequests { errGroup.Go(func() error { return i.FetchBlockLoop(ctx, blockNumbers, blocks) }) @@ -44,11 +52,10 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever endBlockNumber := startBlockNumber - 1 + maxCount i.log.Info("Starting ingester", - "max_batch_size", i.cfg.MaxBatchSize, - "run_forever", maxCount <= 0, - "start_block_number", startBlockNumber, - "end_block_number", endBlockNumber, - "batch_size", i.cfg.MaxBatchSize, + "runForever", maxCount <= 0, + "startBlockNumber", startBlockNumber, + "endBlockNumber", endBlockNumber, + "maxConcurrency", i.cfg.MaxConcurrentRequests, ) // Produce block numbers in the main goroutine @@ -136,25 +143,33 @@ func (i *ingester) FetchBlockLoop( "error", err, ) i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: blockNumber, - Error: err, + Timestamp: time.Now(), + BlockNumbers: fmt.Sprintf("%d", blockNumber), + Error: err, }) - // TODO: should we sleep (backoff) here? - continue + if i.cfg.SkipFailedBlocks { + // We need to send an empty block downstream to indicate that this failed + blocks <- block + continue + } + return err } atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) getBlockElapsed := time.Since(startTime) - i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) startTime = time.Now() select { case <-ctx.Done(): i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) return ctx.Err() case blocks <- block: - i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime)) + i.log.Info( + "FetchBlockLoop: Got and sent block", + "blockNumber", blockNumber, + "getBlockElapsed", getBlockElapsed, + "sendBlockElapsed", time.Since(startTime), + ) } } } @@ -162,61 +177,127 @@ func (i *ingester) FetchBlockLoop( // SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order. // We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. -func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { - i.log.Info("SendBlocks: Starting to receive blocks") - blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order +func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error { + // Buffer for temporarily storing blocks that have arrived out of order + collectedBlocks := make(map[int64]models.RPCBlock) nextNumberToSend := startBlockNumber + batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval) + defer batchTimer.Stop() + + i.log.Info("SendBlocks: Starting to receive blocks") for { + // Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed). select { case <-ctx.Done(): i.log.Info("SendBlocks: Context canceled, stopping") return ctx.Err() - case block, ok := <-blocksCh: + case block, ok := <-blocks: if !ok { i.log.Info("SendBlocks: Channel is closed, returning") return nil } - - blocks[block.BlockNumber] = block - i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks)) - - nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) - i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) + // We got an empty block from the RPC client goroutine, skip to proceed with the next block + if block.Empty() { + if i.cfg.SkipFailedBlocks { + i.log.Info("Skipping empty block", "nextNumberToSend", nextNumberToSend) + nextNumberToSend++ + continue + } + i.log.Info("Received empty block, exiting", "nextNumberToSend", nextNumberToSend) + return errors.Errorf("empty block received") + } + collectedBlocks[block.BlockNumber] = block + i.log.Info( + "SendBlocks: Received block", + "blockNumber", block.BlockNumber, + "bufferSize", len(collectedBlocks), + ) + case <-batchTimer.C: + var err error + nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend) + if err != nil { + return errors.Errorf("send blocks: %w", err) + } + i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) } } } +const maxBatchSize = 100 + // trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap. // Once we have sent all blocks, if any, we return with the nextNumberToSend. // We return the next numberToSend such that the caller can continue from there. func (i *ingester) trySendCompletedBlocks( ctx context.Context, - blocks map[int64]models.RPCBlock, + collectedBlocks map[int64]models.RPCBlock, nextNumberToSend int64, -) int64 { - // Send this block only if we have sent all previous blocks - for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] { - if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil { +) (int64, error) { + // Outer loop: We might need to send multiple batch requests if our buffer is too big + for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] { + // Collect a blocks of blocks to send, only send those which are in order + blocks := make([]models.RPCBlock, 0, len(collectedBlocks)) + for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] { + blocks = append(blocks, block) + delete(collectedBlocks, nextNumberToSend) + nextNumberToSend++ + // Don't send more than maxBatchSize blocks + if len(blocks) == maxBatchSize { + break + } + } + + if len(blocks) == 0 { + return nextNumberToSend, nil + } + + // Send the batch + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + if lastBlockNumber != nextNumberToSend-1 { + panic("unexpected last block number") + } + if err := i.dune.SendBlocks(ctx, blocks); err != nil { if errors.Is(err, context.Canceled) { i.log.Info("SendBlocks: Context canceled, stopping") - return nextNumberToSend + return nextNumberToSend, nil + } + if !i.cfg.SkipFailedBlocks { + err := errors.Errorf("failed to send batch: %w", err) + i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err) + return nextNumberToSend, err + } + if i.cfg.SkipFailedBlocks { + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error( + "SendBlocks: Failed to send batch, continuing", + "blockNumberFirst", blocks[0].BlockNumber, + "blockNumberLast", blocks[len(blocks)-1].BlockNumber, + "error", err, + ) + blockNumbers := make([]string, len(blocks)) + for i, block := range blocks { + blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber) + } + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumbers: strings.Join(blockNumbers, ","), + Error: err, + }) + continue } - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: block.BlockNumber, - Error: err, - }) } else { - i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) - atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) + i.log.Info( + "SendBlocks: Sent batch, updating latest ingested block number", + "blockNumberFirst", blocks[0].BlockNumber, + "blockNumberLast", lastBlockNumber, + "batchSize", len(blocks), + ) + + atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber) } - // We've sent block N, so increment the pointer - delete(blocks, nextNumberToSend) - nextNumberToSend++ } - return nextNumberToSend + + return nextNumberToSend, nil } func (i *ingester) tryUpdateLatestBlockNumber() int64 { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 1bedc47..4fbe583 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -2,6 +2,7 @@ package ingester_test import ( "context" + "fmt" "io" "log/slog" "math/rand" @@ -19,24 +20,34 @@ import ( "golang.org/x/sync/errgroup" ) -func TestRunLoopUntilCancel(t *testing.T) { +func TestRunUntilCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(10) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - if block.BlockNumber == maxBlockNumber { - // cancel execution when we send the last block + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) + } + next++ + } + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block cancel() return context.Canceled } + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -61,21 +72,25 @@ func TestRunLoopUntilCancel(t *testing.T) { // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + MaxConcurrentRequests: 10, + SkipFailedBlocks: false, + }, + nil, // progress + ) err := ing.Run(ctx, 1, -1) // run until canceled require.ErrorIs(t, err, context.Canceled) // this is expected - require.Equal(t, sentBlockNumber, maxBlockNumber) + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) } func TestProduceBlockNumbers(t *testing.T) { duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { - return nil - }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { return nil }, @@ -91,11 +106,18 @@ func TestProduceBlockNumbers(t *testing.T) { return nil }, } + // Swap these to see logs logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }) + // logOutput := os.Stderr + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + }, + nil, // progress + ) blockNumbers := make(chan int64) var wg sync.WaitGroup wg.Add(1) @@ -113,45 +135,59 @@ func TestSendBlocks(t *testing.T) { sentBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - // DuneAPI must fail if it receives blocks out of order - if block.BlockNumber != sentBlockNumber+1 { - return errors.Errorf("blocks out of order") + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) + } + next++ } - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - return nil - }, - PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) return nil }, } + // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), nil, duneapi, ingester.Config{ - MaxBatchSize: 10, // this won't matter as we only run SendBlocks - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + nil, // node client isn't used in this unit test + duneapi, + ingester.Config{ + BlockSubmitInterval: time.Nanosecond, + }, + nil, // progress + ) blocks := make(chan models.RPCBlock) startFromBlock := 1 - group, _ := errgroup.WithContext(context.Background()) + group, ctx := errgroup.WithContext(context.Background()) group.Go(func() error { return ing.SendBlocks(context.Background(), blocks, int64(startFromBlock)) }) // Send blocks except the next block, ensure none are sent to the API for _, n := range []int64{2, 3, 4, 5, 10} { - blocks <- models.RPCBlock{BlockNumber: n} + select { + case <-ctx.Done(): // if error group fails, its context is canceled + require.Fail(t, "context was canceled") + case blocks <- models.RPCBlock{BlockNumber: n, Payload: []byte("block")}: + // Sent block + } require.Equal(t, int64(0), sentBlockNumber) } // Now send the first block - blocks <- models.RPCBlock{BlockNumber: 1} + blocks <- models.RPCBlock{BlockNumber: 1, Payload: []byte("block")} + time.Sleep(time.Millisecond) // Allow enough time for the tick before closing the channel close(blocks) require.NoError(t, group.Wait()) @@ -159,29 +195,41 @@ func TestSendBlocks(t *testing.T) { require.Equal(t, int64(5), sentBlockNumber) } -// TestRunLoopBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order +// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order // even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. -func TestRunLoopBlocksOutOfOrder(t *testing.T) { +func TestRunBlocksOutOfOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) maxBlockNumber := int64(1000) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error { - if len(blocks) != 1 { - panic("expected 1 block") + if len(blocks) == 0 { + return nil } - block := blocks[0] - // Test must fail if DuneAPI receives blocks out of order - require.Equal(t, block.BlockNumber, sentBlockNumber+1) + // Fail if we're not sending a batch of blocks + if len(blocks) == 1 { + panic("expected batch of blocks, got 1") + } + + next := sentBlockNumber + 1 + for _, block := range blocks { + // We cannot send blocks out of order to DuneAPI + if block.BlockNumber != next { + panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) + } + next++ + } - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - if block.BlockNumber == maxBlockNumber { - // cancel execution when we send the last block + lastBlockNumber := blocks[len(blocks)-1].BlockNumber + atomic.StoreInt64(&sentBlockNumber, lastBlockNumber) + if lastBlockNumber >= maxBlockNumber { + // cancel execution when we have sent the last block cancel() return context.Canceled } + return nil }, PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { @@ -193,10 +241,10 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { return maxBlockNumber + 1, nil }, BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { - // Get blocks out of order by sleeping for a random amount of ms - time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + // Get blocks out of order by sleeping for a random amount of time + time.Sleep(time.Duration(rand.Intn(10)) * time.Nanosecond) atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{BlockNumber: blockNumber}, nil + return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil }, CloseFunc: func() error { return nil @@ -205,12 +253,86 @@ func TestRunLoopBlocksOutOfOrder(t *testing.T) { // Swap these to see logs // logOutput := os.Stderr logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 10, // fetch blocks in multiple goroutines - PollInterval: 1000 * time.Millisecond, - }) + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxConcurrentRequests: 10, // fetch blocks in multiple goroutines + // big enough compared to the time spent in block by number to ensure batching. We panic + // in the mocked Dune client if we don't get a batch of blocks (more than one block). + BlockSubmitInterval: 50 * time.Millisecond, + SkipFailedBlocks: false, + }, + nil, // progress + ) err := ing.Run(ctx, 1, -1) // run until canceled require.ErrorIs(t, err, context.Canceled) // this is expected - require.Equal(t, sentBlockNumber, maxBlockNumber) + require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) +} + +// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block +func TestRunRPCNodeFails(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + maxBlockNumber := int64(1000) + producedBlockNumber := int64(0) + someRPCError := errors.Errorf("some RPC error") + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlocksFunc: func(_ context.Context, _ []models.RPCBlock) error { + return someRPCError + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + // Get blocks out of order by sleeping for a random amount of time + time.Sleep(time.Duration(rand.Intn(10)) * time.Nanosecond) + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil + }, + CloseFunc: func() error { + return nil + }, + } + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxConcurrentRequests: 10, + BlockSubmitInterval: time.Millisecond, + SkipFailedBlocks: false, + }, + nil, // progress + ) + + err := ing.Run(ctx, 1, -1) // run until canceled + require.ErrorIs(t, err, someRPCError) +} + +// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block +func TestRunFailsIfNoConcurrentRequests(t *testing.T) { + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + nil, + nil, + ingester.Config{ + MaxConcurrentRequests: 0, + }, + nil, // progress + ) + + err := ing.Run(context.Background(), 1, -1) // run until canceled + require.ErrorContains(t, err, "MaxConcurrentRequests must be > 0") } diff --git a/models/block.go b/models/block.go index ffb410c..1435a8d 100644 --- a/models/block.go +++ b/models/block.go @@ -5,3 +5,7 @@ type RPCBlock struct { // agnostic blob of data that is the block Payload []byte } + +func (b RPCBlock) Empty() bool { + return len(b.Payload) == 0 +}