Skip to content

Commit

Permalink
Fetch blocks concurrently but send in order
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 21, 2024
1 parent f776e08 commit 0b18551
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 240 deletions.
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
MaxBatchSize: 1,
MaxBatchSize: 10,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
Expand Down
13 changes: 9 additions & 4 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error

// ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
Expand All @@ -33,7 +38,7 @@ type Ingester interface {
}

const (
defaultMaxBatchSize = 1
defaultMaxBatchSize = 10
defaultPollInterval = 1 * time.Second
defaultReportProgressInterval = 30 * time.Second
)
Expand Down
221 changes: 142 additions & 79 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,57 @@ import (
"golang.org/x/sync/errgroup"
)

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune
//
// We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers),
// and each concurrent ConsumeBlock goroutine gets a block number from that channel.
// The SendBlocks goroutine receives all blocks on an unbuffered channel,
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
ctx, cancel := context.WithCancel(ctx)
errGroup, ctx := errgroup.WithContext(ctx)

inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) // we close this after ConsumeBlocks has returned
// Produce block numbers on blockNumbers -> fetch blocks concurrently, send on blocks -> send blocks to DuneAPI
// ProduceBlockNumbers -> ConsumeBlocks -> SendBlocks
blockNumbers := make(chan int64)
defer close(blockNumbers)
blocks := make(chan models.RPCBlock)
defer close(blocks)

// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, blockNumbers, blocks)
})
}
// Progress reporting goroutine
errGroup.Go(func() error {
return i.ReportProgress(ctx)
})
// One gouroutine for sending blocks to DuneAPI
errGroup.Go(func() error {
return i.SendBlocks(ctx, blocks, startBlockNumber)
})

// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxCount", maxCount,
"max_batch_size", i.cfg.MaxBatchSize,
"run_forever", maxCount <= 0,
"start_block_number", startBlockNumber,
"end_block_number", endBlockNumber,
"batch_size", i.cfg.MaxBatchSize,
)

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
})
errGroup.Go(func() error {
return i.ReportProgress(ctx)
})

err := i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber)
close(inFlightChan)
// Produce block numbers in main goroutine
err := i.ProduceBlockNumbers(ctx, blockNumbers, startBlockNumber, endBlockNumber)
i.log.Info("ProduceBlockNumbers is done", "error", err)
i.log.Info("Cancelling context")
cancel()
if err != nil {
if err := errGroup.Wait(); err != nil {
i.log.Error("errgroup wait", "error", err)
}
return errors.Errorf("consume blocks: %w", err)
}

i.log.Info("waiting on err group")
if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}
Expand All @@ -53,12 +71,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks
func (i *ingester) ProduceBlockNumbers(
ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64,
) error {
latestBlockNumber := i.tryUpdateLatestBlockNumber()

// Helper function
waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
Expand All @@ -77,82 +96,126 @@ func (i *ingester) ConsumeBlocks(

// Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0
dontStop := endBlockNumber < startBlockNumber

i.log.Info("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber)
for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should I sleep (backoff) here?
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)

select {
case <-ctx.Done():
i.log.Info("ProduceBlockNumbers: Context canceled, stopping")
return ctx.Err()
case outChan <- block:
case blockNumbers <- blockNumber:
}

distanceFromLatest := latestBlockNumber - block.BlockNumber
distanceFromLatest := latestBlockNumber - blockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", block.BlockNumber,
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"distanceFromLatest", distanceFromLatest,
"getBlockElapsedMillis", getBlockElapsed.Milliseconds(),
"elapsedMillis", time.Since(startTime).Milliseconds(),
)
}
}
// Done consuming blocks, either because we reached the endBlockNumber or the context was canceled
i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber)
i.log.Info("Finished producing block numbers")
return ErrFinishedConsumeBlocks
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for payload := range blocksCh {
// TODO: we should batch RCP blocks here before sending to Dune.
if err := i.dune.SendBlock(ctx, payload); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
// ConsumeBlocks from the NPC Node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) ConsumeBlocks(
ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock,
) error {
for {
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: context is done")
return ctx.Err()
case blockNumber := <-blockNumbers:
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("ConsumeBlocks: Context canceled, stopping")
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should we sleep (backoff) here?
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("Sent block")
}
}
}
}

// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order.
// We buffer the blocks in a map, and send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
next := startBlockNumber
for {
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocksCh:
if !ok {
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blockMap[block.BlockNumber] = block
i.log.Info("Received block", "blockNumber", block.BlockNumber)

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber)
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}

// We've sent block N, so increment the pointer
delete(blockMap, next)
next++
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: payload.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", payload.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
}
}
return ctx.Err() // channel closed
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down Expand Up @@ -183,7 +246,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable
fallingBehind := newDistance > (previousDistance + 1) // TODO: make this more stable

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
Expand Down
Loading

0 comments on commit 0b18551

Please sign in to comment.