From c768a8f786cfe9717131b9396a2a7a79b6637d46 Mon Sep 17 00:00:00 2001 From: Vegard Stikbakke Date: Sat, 22 Jun 2024 07:17:14 +0200 Subject: [PATCH] Some naming improvements in mainloop.go --- ingester/ingester.go | 4 +- ingester/mainloop.go | 82 +++++++++++++++++++++++---------------- ingester/mainloop_test.go | 6 +-- 3 files changed, 52 insertions(+), 40 deletions(-) diff --git a/ingester/ingester.go b/ingester/ingester.go index 216464b..744a047 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -19,10 +19,10 @@ type Ingester interface { // it will run continuously until the context is cancelled ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error - // ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel. + // FetchBlockLoop fetches blocks sent on the channel and sends them on the other channel. // It will run continuously until the context is cancelled, or the channel is closed. // It can safely be run concurrently. - ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error + FetchBlockLoop(context.Context, chan int64, chan models.RPCBlock) error // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop // it will block until: diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 74d0f3d..0f86e3e 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -13,10 +13,10 @@ import ( // Run fetches blocks from a node RPC and sends them in order to the Dune API. // -// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune +// ProduceBlockNumbers (blockNumbers channel) -> FetchBlockLoop (blocks channel) -> SendBlocks -> Dune // // We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers), -// and each concurrent ConsumeBlock goroutine gets a block number from that channel. +// and each concurrent FetchBlockLoop goroutine gets a block number from that channel. // The SendBlocks goroutine receives all blocks on an unbuffered channel, // but buffers them in a map until they can be sent in order. func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { @@ -31,7 +31,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int // Start MaxBatchSize goroutines to consume blocks concurrently for range i.cfg.MaxBatchSize { errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, blockNumbers, blocks) + return i.FetchBlockLoop(ctx, blockNumbers, blocks) }) } errGroup.Go(func() error { @@ -60,9 +60,9 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int return errGroup.Wait() } -var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") +var ErrFinishedFetchBlockLoop = errors.New("finished FetchBlockLoop") -// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks +// ProduceBlockNumbers to be consumed by multiple goroutines running FetchBlockLoop func (i *ingester) ProduceBlockNumbers( ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64, ) error { @@ -109,17 +109,17 @@ func (i *ingester) ProduceBlockNumbers( } } i.log.Info("Finished producing block numbers") - return ErrFinishedConsumeBlocks + return ErrFinishedFetchBlockLoop } -// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching. -func (i *ingester) ConsumeBlocks( +// FetchBlockLoop from the RPC node. This can be run in multiple goroutines to parallelize block fetching. +func (i *ingester) FetchBlockLoop( ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock, ) error { for { select { case <-ctx.Done(): - i.log.Info("ConsumeBlocks: context is done") + i.log.Info("FetchBlockLoop: context is done") return ctx.Err() case blockNumber := <-blockNumbers: startTime := time.Now() @@ -127,7 +127,7 @@ func (i *ingester) ConsumeBlocks( block, err := i.node.BlockByNumber(ctx, blockNumber) if err != nil { if errors.Is(err, context.Canceled) { - i.log.Info("ConsumeBlocks: Context canceled, stopping") + i.log.Info("FetchBlockLoop: Context canceled, stopping") return ctx.Err() } @@ -150,7 +150,7 @@ func (i *ingester) ConsumeBlocks( i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed) select { case <-ctx.Done(): - i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) + i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber) return ctx.Err() case blocks <- block: } @@ -158,12 +158,12 @@ func (i *ingester) ConsumeBlocks( } } -// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order. +// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order. // We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune. func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error { i.log.Info("SendBlocks: Starting to receive blocks") blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order - next := startBlockNumber + nextNumberToSend := startBlockNumber for { select { case <-ctx.Done(): @@ -176,32 +176,46 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo } blockMap[block.BlockNumber] = block + i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber) - // Send this block only if we have sent all previous blocks - for block, ok := blockMap[next]; ok; block, ok = blockMap[next] { - if err := i.dune.SendBlock(ctx, block); err != nil { - if errors.Is(err, context.Canceled) { - i.log.Info("SendBlocks: Context canceled, stopping") - return ctx.Err() - } - // TODO: implement DeadLetterQueue - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: block.BlockNumber, - Error: err, - }) - } else { - atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) - } + nextNumberToSend = i.trySendCompletedBlocks(ctx, blockMap, nextNumberToSend) + i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) + } + } +} - // We've sent block N, so increment the pointer - delete(blockMap, next) - next++ +// trySendCompletedBlocks sends all blocks that can be sent, in order, from the blockMap. +// Once we have sent all blocks, if any to Dune, we return with the nextNumberToSend. +// We have to return the next numberToSend such that the caller can continue from there. +func (i *ingester) trySendCompletedBlocks( + ctx context.Context, + blockMap map[int64]models.RPCBlock, + nextNumberToSend int64, +) int64 { + // Send this block only if we have sent all previous blocks + for block, ok := blockMap[nextNumberToSend]; ok; block, ok = blockMap[nextNumberToSend] { + if err := i.dune.SendBlock(ctx, block); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("SendBlocks: Context canceled, stopping") + return nextNumberToSend } + // TODO: implement DeadLetterQueue + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err) + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: block.BlockNumber, + Error: err, + }) + } else { + i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber) + atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber) } + // We've sent block N, so increment the pointer + delete(blockMap, nextNumberToSend) + nextNumberToSend++ } + return nextNumberToSend } func (i *ingester) tryUpdateLatestBlockNumber() int64 { diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 907c2af..5ccb285 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -158,10 +158,8 @@ func TestRunLoopUntilBlocksOutOfOrder(t *testing.T) { producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { - // DuneAPI must fail if it receives blocks out of order - if block.BlockNumber != sentBlockNumber+1 { - return errors.Errorf("blocks out of order") - } + // Test must fail if DuneAPI receives blocks out of order + require.Equal(t, block.BlockNumber, sentBlockNumber+1) atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) if block.BlockNumber == maxBlockNumber {