Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch blocks concurrently but send in order #32

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
MaxBatchSize: 1,
MaxBatchSize: cfg.Concurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Config struct {
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
13 changes: 9 additions & 4 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// ProduceBlockNumbers sends block numbers from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error
ProduceBlockNumbers(ctx context.Context, outChan chan int64, startBlockNumber int64, endBlockNumber int64) error

// ConsumeBlocks fetches blocks sent on the channel and sends them on the other channel.
// It will run continuously until the context is cancelled, or the channel is closed.
// It can safely be run concurrently.
ConsumeBlocks(context.Context, chan int64, chan models.RPCBlock) error
vegarsti marked this conversation as resolved.
Show resolved Hide resolved

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error
SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startFrom int64) error

// This is just a placeholder for now
Info() Info
Expand All @@ -33,7 +38,7 @@ type Ingester interface {
}

const (
defaultMaxBatchSize = 1
defaultMaxBatchSize = 5
defaultPollInterval = 1 * time.Second
defaultReportProgressInterval = 30 * time.Second
)
Expand Down
222 changes: 138 additions & 84 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,54 +11,64 @@ import (
"golang.org/x/sync/errgroup"
)

// Run fetches blocks from a node RPC and sends them in order to the Dune API.
//
// ProduceBlockNumbers (blockNumbers channel) -> ConsumeBlocks (blocks channel) -> SendBlocks -> Dune
//
// We produce block numbers to fetch on an unbuffered channel (ProduceBlockNumbers),
// and each concurrent ConsumeBlock goroutine gets a block number from that channel.
// The SendBlocks goroutine receives all blocks on an unbuffered channel,
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
ctx, cancel := context.WithCancel(ctx)
errGroup, ctx := errgroup.WithContext(ctx)

inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) // we close this after ConsumeBlocks has returned

// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxCount", maxCount,
)
blockNumbers := make(chan int64)
defer close(blockNumbers)
blocks := make(chan models.RPCBlock)
defer close(blocks)

errGroup, ctx := errgroup.WithContext(ctx)
// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, blockNumbers, blocks)
})
}
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
return i.ReportProgress(ctx)
})
errGroup.Go(func() error {
return i.ReportProgress(ctx)
return i.SendBlocks(ctx, blocks, startBlockNumber)
})

err := i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber)
close(inFlightChan)
cancel()
if err != nil {
if err := errGroup.Wait(); err != nil {
i.log.Error("errgroup wait", "error", err)
}
return errors.Errorf("consume blocks: %w", err)
}
// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount
i.log.Info("Starting ingester",
"max_batch_size", i.cfg.MaxBatchSize,
"run_forever", maxCount <= 0,
"start_block_number", startBlockNumber,
"end_block_number", endBlockNumber,
"batch_size", i.cfg.MaxBatchSize,
)

if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}
// Produce block numbers in the main goroutine
err := i.ProduceBlockNumbers(ctx, blockNumbers, startBlockNumber, endBlockNumber)
i.log.Info("ProduceBlockNumbers is done", "error", err)
i.log.Info("Cancelling context")
cancel()

return nil
return errGroup.Wait()
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
// ProduceBlockNumbers to be consumed by multiple goroutines running ConsumeBlocks
func (i *ingester) ProduceBlockNumbers(
ctx context.Context, blockNumbers chan int64, startBlockNumber int64, endBlockNumber int64,
) error {
latestBlockNumber := i.tryUpdateLatestBlockNumber()

// Helper function
waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
Expand All @@ -77,82 +87,126 @@ func (i *ingester) ConsumeBlocks(

// Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0
dontStop := endBlockNumber < startBlockNumber

i.log.Info("Produce block numbers from", "startBlockNumber", startBlockNumber, "endBlockNumber", endBlockNumber)
for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should I sleep (backoff) here?
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)

select {
case <-ctx.Done():
i.log.Info("ProduceBlockNumbers: Context canceled, stopping")
return ctx.Err()
case outChan <- block:
case blockNumbers <- blockNumber:
}

distanceFromLatest := latestBlockNumber - block.BlockNumber
distanceFromLatest := latestBlockNumber - blockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", block.BlockNumber,
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"distanceFromLatest", distanceFromLatest,
"getBlockElapsedMillis", getBlockElapsed.Milliseconds(),
"elapsedMillis", time.Since(startTime).Milliseconds(),
)
}
}
// Done consuming blocks, either because we reached the endBlockNumber or the context was canceled
i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber)
i.log.Info("Finished producing block numbers")
return ErrFinishedConsumeBlocks
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for payload := range blocksCh {
// TODO: we should batch RCP blocks here before sending to Dune.
if err := i.dune.SendBlock(ctx, payload); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
// ConsumeBlocks from the RPC node. This can be run in multiple goroutines to parallelize block fetching.
func (i *ingester) ConsumeBlocks(
ctx context.Context, blockNumbers chan int64, blocks chan models.RPCBlock,
) error {
for {
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: context is done")
return ctx.Err()
case blockNumber := <-blockNumbers:
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("ConsumeBlocks: Context canceled, stopping")
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})

// TODO: should we sleep (backoff) here?
continue
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
select {
case <-ctx.Done():
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("Sent block")
}
}
}
}

// SendBlocks to Dune. We receive blocks from the ConsumeBlocks goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blockMap := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
next := startBlockNumber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextNumberToSend ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, agreed

for {
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocksCh:
if !ok {
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blockMap[block.BlockNumber] = block
i.log.Info("Received block", "blockNumber", block.BlockNumber)

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is smart for loop :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatGPT showed me this 😄

i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be an isolated function:
next = trySendCompletedBlocks(blockMap, next, ...) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good call!

if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}

// We've sent block N, so increment the pointer
delete(blockMap, next)
next++
}
// TODO: implement DeadLetterQueue
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: payload.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", payload.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
}
}
return ctx.Err() // channel closed
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down Expand Up @@ -183,7 +237,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable
fallingBehind := newDistance > (previousDistance + 1) // TODO: make this more stable

rpcErrors := len(i.info.RPCErrors)
duneErrors := len(i.info.DuneErrors)
Expand Down
Loading
Loading