From b0975b04050814d0f8791976e9f9c9daa64f409b Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Fri, 21 Jun 2024 08:41:53 +0200 Subject: [PATCH] Fetch blocks concurrently but send in order --- cmd/main.go | 2 +- config/config.go | 1 + ingester/ingester.go | 13 +- ingester/mainloop.go | 222 ++++++++++++++++++------------ ingester/mainloop_test.go | 274 ++++++++++++++++++-------------------- 5 files changed, 278 insertions(+), 234 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 95fc4a0..6a580cd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -82,7 +82,7 @@ func main() { rpcClient, duneClient, ingester.Config{ - MaxBatchSize: 1, + MaxBatchSize: cfg.Concurrency, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, Stack: cfg.RPCStack, diff --git a/config/config.go b/config/config.go index aa443fe..b0641e6 100644 --- a/config/config.go +++ b/config/config.go @@ -40,6 +40,7 @@ type Config struct { ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll RPCNode RPCClient RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll + Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers"` // nolint:lll } func (c Config) HasError() error { diff --git a/ingester/ingester.go b/ingester/ingester.go index a5f2743..216464b 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -14,17 +14,22 @@ type Ingester interface { // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested Run(ctx context.Context, startBlockNumber int64, maxCount int64) error - // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. + // ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain // it will run continuously until the context is cancelled - ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error + ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error + + // ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel. + // It will run continuously until the context is cancelled, or the channel is closed. + // It can safely be run concurrently. + ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop // it will block until: // - the context is cancelled // - channel is closed // - a fatal error occurs - SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error + SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error // This is just a placeholder for now Info() Info @@ -33,7 +38,7 @@ type Ingester interface { } const ( - defaultMaxBatchSize = 1 + defaultMaxBatchSize = 5 defaultPollInterval = 1 * time.Second defaultReportProgressInterval = 30 * time.Second ) diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 0db27a7..4cf4f30 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -11,54 +11,64 @@ import ( "golang.org/x/sync/errgroup" ) +// Run fetches blocks from a node RPC and sends them in order to the Dune API. +// +// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune +// +// We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers), +// and each concurrent ConsumeBlock goroutine gets a block number from that channel. +// The SendBlocks goroutine receives all blocks on an unbuffered channel, +// but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { ctx, cancel := context.WithCancel(ctx) + errGroup, ctx := errgroup.WithContext(ctx) - inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) // we close this after ConsumeBlocks has returned - - // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever - endBlockNumber := startBlockNumber - 1 + maxCount - - i.log.Info("Starting ingester", - "maxBatchSize", i.cfg.MaxBatchSize, - "startBlockNumber", startBlockNumber, - "endBlockNumber", endBlockNumber, - "maxCount", maxCount, - ) + blockNumbers := make(chan int64) + defer close(blockNumbers) + blocks := make(chan models.RPCBlock) + defer close(blocks) - errGroup, ctx := errgroup.WithContext(ctx) + // Start MaxBatchSize goroutines to consume blocks concurrently + for range i.cfg.MaxBatchSize { + errGroup.Go(func() error { + return i.ConsumeBlocks(ctx, blockNumbers, blocks) + }) + } errGroup.Go(func() error { - return i.SendBlocks(ctx, inFlightChan) + return i.ReportProgress(ctx) }) errGroup.Go(func() error { - return i.ReportProgress(ctx) + return i.SendBlocks(ctx, blocks, startBlockNumber) }) - err := i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber) - close(inFlightChan) - cancel() - if err != nil { - if err := errGroup.Wait(); err != nil { - i.log.Error("errgroup wait", "error", err) - } - return errors.Errorf("consume blocks: %w", err) - } + // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever + endBlockNumber := startBlockNumber - 1 + maxCount + i.log.Info("Starting ingester", + "max_batch_size", i.cfg.MaxBatchSize, + "run_forever", maxCount <= 0, + "start_block_number", startBlockNumber, + "end_block_number", endBlockNumber, + "batch_size", i.cfg.MaxBatchSize, + ) - if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks { - return err - } + // Produce block numbers in the main goroutine + err := i.ProduceBlockNumbers(ctx, blockNumbers, startBlockNumber, endBlockNumber) + i.log.Info("ProduceBlockNumbers is done", "error", err) + i.log.Info("Cancelling context") + cancel() - return nil + return errGroup.Wait() } var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") -// ConsumeBlocks from the NPC Node -func (i *ingester) ConsumeBlocks( - ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, +// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks +func (i *ingester) ProduceBlockNumbers( + ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64, ) error { latestBlockNumber := i.tryUpdateLatestBlockNumber() + // Helper function waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 { for blockNumber > latestBlockNumber { select { @@ -77,82 +87,126 @@ func (i *ingester) ConsumeBlocks( // Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0 dontStop := endBlockNumber < startBlockNumber - + i.log.Info("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber) for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber) - startTime := time.Now() - - i.log.Info("Getting block by number", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber) - block, err := i.node.BlockByNumber(ctx, blockNumber) - if err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("Context canceled, stopping..") - return ctx.Err() - } - - i.log.Error("Failed to get block by number, continuing..", - "blockNumber", blockNumber, - "latestBlockNumber", latestBlockNumber, - "error", err, - ) - i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: blockNumber, - Error: err, - }) - - // TODO: should I sleep (backoff) here? - continue - } - - atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) - getBlockElapsed := time.Since(startTime) select { case <-ctx.Done(): + i.log.Info("ProduceBlockNumbers: Context canceled, stopping") return ctx.Err() - case outChan <- block: + case blockNumbers <- blockNumber: } - distanceFromLatest := latestBlockNumber - block.BlockNumber + distanceFromLatest := latestBlockNumber - blockNumber if distanceFromLatest > 0 { // TODO: improve logs of processing speed and catchup estimated ETA i.log.Info("We're behind, trying to catch up..", - "blockNumber", block.BlockNumber, + "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber, "distanceFromLatest", distanceFromLatest, - "getBlockElapsedMillis", getBlockElapsed.Milliseconds(), - "elapsedMillis", time.Since(startTime).Milliseconds(), ) } } - // Done consuming blocks, either because we reached the endBlockNumber or the context was canceled - i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber) + i.log.Info("Finished producing block numbers") return ErrFinishedConsumeBlocks } -func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { - for payload := range blocksCh { - // TODO: we should batch RCP blocks here before sending to Dune. - if err := i.dune.SendBlock(ctx, payload); err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("Context canceled, stopping..") +// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching. +func (i *ingester) ConsumeBlocks( + ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock, +) error { + for { + select { + case <-ctx.Done(): + i.log.Info("ConsumeBlocks: context is done") + return ctx.Err() + case blockNumber := <-blockNumbers: + startTime := time.Now() + + i.log.Info("Getting block by number", "blockNumber", blockNumber) + block, err := i.node.BlockByNumber(ctx, blockNumber) + if err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("ConsumeBlocks: Context canceled, stopping") + return ctx.Err() + } + + i.log.Error("Failed to get block by number, continuing..", + "blockNumber", blockNumber, + "error", err, + ) + i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: blockNumber, + Error: err, + }) + + // TODO: should we sleep (backoff) here? + continue + } + + atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber) + getBlockElapsed := time.Since(startTime) + i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) + select { + case <-ctx.Done(): + i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) return ctx.Err() + case blocks <- block: + i.log.Info("Sent block") + } + } + } +} + +// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order. +// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. +func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { + i.log.Info("SendBlocks: Starting to receive blocks") + blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order + next := startBlockNumber + for { + select { + case <-ctx.Done(): + i.log.Info("SendBlocks: Context canceled, stopping") + return ctx.Err() + case block, ok := <-blocksCh: + if !ok { + i.log.Info("SendBlocks: Channel is closed, returning") + return nil + } + + blockMap[block.BlockNumber] = block + i.log.Info("Received block", "blockNumber", block.BlockNumber) + + // Send this block only if we have sent all previous blocks + for block, ok := blockMap[next]; ok; block, ok = blockMap[next] { + i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber) + if err := i.dune.SendBlock(ctx, block); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocks: Context canceled, stopping") + return ctx.Err() + } + // TODO: implement DeadLetterQueue + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: block.BlockNumber, + Error: err, + }) + } else { + i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) + atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) + } + + // We've sent block N, so increment the pointer + delete(blockMap, next) + next++ } - // TODO: implement DeadLetterQueue - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: payload.BlockNumber, - Error: err, - }) - } else { - i.log.Info("Updating latest ingested block number", "blockNumber", payload.BlockNumber) - atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber) } } - return ctx.Err() // channel closed } func (i *ingester) tryUpdateLatestBlockNumber() int64 { @@ -183,7 +237,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error { blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds() newDistance := latest - lastIngested - fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable + fallingBehind := newDistance > (previousDistance + 1) // TODO: make this more stable rpcErrors := len(i.info.RPCErrors) duneErrors := len(i.info.DuneErrors) diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index e2a969d..907c2af 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -4,6 +4,8 @@ import ( "context" "io" "log/slog" + "math/rand" + "sync" "sync/atomic" "testing" "time" @@ -14,170 +16,153 @@ import ( "github.com/duneanalytics/blockchain-ingester/models" "github.com/go-errors/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) -func TestBlockConsumptionLoopErrors(t *testing.T) { - testcases := []struct { - name string - LatestIsBroken bool - BlockByNumberIsBroken bool - }{ - { - name: "we're up to date, following the head", - LatestIsBroken: false, - BlockByNumberIsBroken: false, - }, - { - name: "the RPC node is broken, all API calls are failing", - LatestIsBroken: true, - BlockByNumberIsBroken: true, - }, - { - name: "BlockByNumber, a specific jsonRPC on the RPC node is broken", - LatestIsBroken: false, - BlockByNumberIsBroken: true, +func TestRunLoopUntilCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + maxBlockNumber := int64(10) + sentBlockNumber := int64(0) + producedBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + if block.BlockNumber == maxBlockNumber { + // cancel execution when we send the last block + cancel() + return context.Canceled + } + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil }, } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - if tc.LatestIsBroken { - t.Skip("latest block number is broken, we don't behave correctly yet") - } - ctx, cancel := context.WithCancel(context.Background()) - maxBlockNumber := int64(100) - producedBlockNumber := int64(0) - rpcClient := &jsonrpc_mock.BlockchainClientMock{ - LatestBlockNumberFunc: func() (int64, error) { - if tc.LatestIsBroken { - return 0, errors.New("latest block number is broken") - } - return maxBlockNumber, nil - }, - BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { - if tc.BlockByNumberIsBroken { - return models.RPCBlock{}, errors.New("block by number is broken") - } - if blockNumber > maxBlockNumber { - // end tests - cancel() - } - atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{ - BlockNumber: blockNumber, - Payload: []byte(`block`), - }, nil - }, - CloseFunc: func() error { - return nil - }, - } - // Swap these to see logs - // logOutput := os.Stderr - logOutput := io.Discard - ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, nil, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }) - - outCh := make(chan models.RPCBlock, maxBlockNumber+1) - err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber) - require.ErrorIs(t, err, ingester.ErrFinishedConsumeBlocks) - if tc.BlockByNumberIsBroken { - require.Equal(t, producedBlockNumber, int64(0)) - } - }) + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return maxBlockNumber + 1, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + atomic.StoreInt64(&producedBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + CloseFunc: func() error { + return nil + }, } + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + + err := ing.Run(ctx, 1, -1) // run until canceled + require.ErrorIs(t, err, context.Canceled) // this is expected + require.Equal(t, sentBlockNumber, maxBlockNumber) } -func TestBlockSendingLoop(t *testing.T) { - testcases := []string{ - "we're up to date, following the head", - "we're failing intermittently, the Dune API is broken", - "we're erroring systematically, the Dune API is down", +func TestProduceBlockNumbers(t *testing.T) { + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(_ context.Context, _ models.RPCBlock) error { + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return 100_000, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + return models.RPCBlock{BlockNumber: blockNumber}, nil + }, + CloseFunc: func() error { + return nil + }, } - for _, testcase := range testcases { - t.Run(testcase, func(t *testing.T) { - t.Skip("not implemented") - }) + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }) + blockNumbers := make(chan int64) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ing.ProduceBlockNumbers(context.Background(), blockNumbers, 1, 100_000) + }() + for i := 1; i <= 100_000; i++ { + require.Equal(t, int64(i), <-blockNumbers) } + wg.Wait() } -func TestRunLoopBaseCase(t *testing.T) { - testCases := []struct { - name string - maxCount int64 - lastIngested int64 - expectedEndBlock int64 - }{ - {name: "1 block", maxCount: 1, lastIngested: 0, expectedEndBlock: 1}, - {name: "2 blocks", maxCount: 2, lastIngested: 0, expectedEndBlock: 2}, - {name: "100 blocks", maxCount: 100, lastIngested: 0, expectedEndBlock: 100}, - {name: "100 blocks, starting from 50", maxCount: 100, lastIngested: 50, expectedEndBlock: 150}, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - latestBlockNumber := int64(1000) - ingestedBlockNumber := int64(0) - - rpcClient := &jsonrpc_mock.BlockchainClientMock{ - LatestBlockNumberFunc: func() (int64, error) { - return latestBlockNumber, nil - }, - BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { - atomic.StoreInt64(&latestBlockNumber, blockNumber) - return models.RPCBlock{ - BlockNumber: blockNumber, - Payload: []byte(`block`), - }, nil - }, - CloseFunc: func() error { - return nil - }, +func TestSendBlocks(t *testing.T) { + sentBlockNumber := int64(0) + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + // DuneAPI must fail if it receives blocks out of order + if block.BlockNumber != sentBlockNumber+1 { + return errors.Errorf("blocks out of order") } + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), nil, duneapi, ingester.Config{ + MaxBatchSize: 10, // this won't matter as we only run SendBlocks + PollInterval: 1000 * time.Millisecond, + }) - duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { - atomic.StoreInt64(&ingestedBlockNumber, block.BlockNumber) - return nil - }, - PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { - return nil - }, - } + blocks := make(chan models.RPCBlock) - // Swap these to see logs - // logOutput := os.Stderr - logOutput := io.Discard - ing := ingester.New( - slog.New(slog.NewTextHandler(logOutput, nil)), - rpcClient, - duneapi, - ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }, - ) + startFromBlock := 1 - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + group, _ := errgroup.WithContext(context.Background()) + group.Go(func() error { + return ing.SendBlocks(context.Background(), blocks, int64(startFromBlock)) + }) - startBlockNumber := tc.lastIngested + 1 - err := ing.Run(ctx, startBlockNumber, tc.maxCount) - require.ErrorIs(t, err, ingester.ErrFinishedConsumeBlocks) - // require.Equal(t, tc.lastIngested+tc.maxCount, producedBlockNumber) - require.Equal(t, tc.expectedEndBlock, atomic.LoadInt64(&ingestedBlockNumber)) - }) + // Send blocks except the next block, ensure none are sent to the API + for _, n := range []int64{2, 3, 4, 5, 10} { + blocks <- models.RPCBlock{BlockNumber: n} + require.Equal(t, int64(0), sentBlockNumber) } + // Now send the first block + blocks <- models.RPCBlock{BlockNumber: 1} + close(blocks) + require.NoError(t, group.Wait()) + + // Ensure the last correct block was sent + require.Equal(t, int64(5), sentBlockNumber) } -func TestRunLoopUntilCancel(t *testing.T) { +// TestRunLoopUntilBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order +// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time. +func TestRunLoopUntilBlocksOutOfOrder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - maxBlockNumber := int64(10) + maxBlockNumber := int64(1000) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + // DuneAPI must fail if it receives blocks out of order + if block.BlockNumber != sentBlockNumber+1 { + return errors.Errorf("blocks out of order") + } + atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) if block.BlockNumber == maxBlockNumber { // cancel execution when we send the last block @@ -195,11 +180,10 @@ func TestRunLoopUntilCancel(t *testing.T) { return maxBlockNumber + 1, nil }, BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + // Get blocks out of order by sleeping for a random amount of ms + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{ - BlockNumber: blockNumber, - Payload: []byte(`block`), - }, nil + return models.RPCBlock{BlockNumber: blockNumber}, nil }, CloseFunc: func() error { return nil @@ -209,7 +193,7 @@ func TestRunLoopUntilCancel(t *testing.T) { // logOutput := os.Stderr logOutput := io.Discard ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, + MaxBatchSize: 10, // fetch blocks in multiple goroutines PollInterval: 1000 * time.Millisecond, })