Skip to content

Commit

Permalink
Send batch of blocks from the main loop
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 27, 2024
1 parent a448231 commit f2fb85f
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 118 deletions.
15 changes: 7 additions & 8 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) {
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
if resp == nil {
log.Warn("Retrying request", "error", err)
} else {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
}
}
return yes, err2
}
Expand Down Expand Up @@ -191,11 +195,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
return err
}

err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}

return nil
}

Expand All @@ -220,15 +219,15 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
defer func() {
if err != nil {
c.log.Error("Sending progress report failed",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
"responseBody", responseBody,
)
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"duration", time.Since(start),
)
Expand Down
8 changes: 6 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func main() {
// Get stored progress unless config indicates we should start from 0
var startBlockNumber int64
// Default to -1 to start where the ingester left off
var progress *models.BlockchainIndexProgress
if cfg.BlockHeight == -1 {
progress, err := duneClient.GetProgressReport(ctx)
progress, err = duneClient.GetProgressReport(ctx)
if err != nil {
stdlog.Fatal(err)
} else {
Expand All @@ -82,12 +83,15 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
MaxBatchSize: cfg.Concurrency,
MaxConcurrentRequests: cfg.RPCConcurrency,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BlockSubmitInterval: cfg.BlockSubmitInterval,
SkipFailedBlocks: false,
},
progress,
)

wg.Add(1)
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent requests to the RPC node" default:"10"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"1s"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
31 changes: 22 additions & 9 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ const (
)

type Config struct {
MaxBatchSize int
MaxConcurrentRequests int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
}

type Info struct {
Expand All @@ -60,9 +62,9 @@ type Info struct {
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumber int64
Error error
Timestamp time.Time
BlockNumbers string
Error error
}

type ingester struct {
Expand All @@ -73,16 +75,27 @@ type ingester struct {
info Info
}

func New(log *slog.Logger, node jsonrpc.BlockchainClient, dune duneapi.BlockchainIngester, cfg Config) Ingester {
func New(
log *slog.Logger,
node jsonrpc.BlockchainClient,
dune duneapi.BlockchainIngester,
cfg Config,
progress *models.BlockchainIndexProgress,
) Ingester {
info := Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
}
if progress != nil {
info.LatestBlockNumber = progress.LatestBlockNumber
info.IngestedBlockNumber = progress.LastIngestedBlockNumber
}
ing := &ingester{
log: log.With("module", "ingester"),
node: node,
dune: dune,
cfg: cfg,
info: Info{
RPCErrors: []ErrorInfo{},
DuneErrors: []ErrorInfo{},
},
info: info,
}
if ing.cfg.PollInterval == 0 {
ing.cfg.PollInterval = defaultPollInterval
Expand Down
167 changes: 124 additions & 43 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

Expand All @@ -21,15 +22,22 @@ import (
// but buffers them in a map until they can be sent in order.
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errGroup, ctx := errgroup.WithContext(ctx)

blockNumbers := make(chan int64)
defer close(blockNumbers)
blocks := make(chan models.RPCBlock)

// We buffer the block channel so that RPC requests can be made concurrently with sending blocks to Dune.
// We limit the buffer size to the same number of concurrent requests, so we exert some backpressure.
blocks := make(chan models.RPCBlock, i.cfg.MaxConcurrentRequests)
defer close(blocks)

// Start MaxBatchSize goroutines to consume blocks concurrently
for range i.cfg.MaxBatchSize {
if i.cfg.MaxConcurrentRequests <= 0 {
return errors.Errorf("MaxConcurrentRequests must be > 0")
}
for range i.cfg.MaxConcurrentRequests {
errGroup.Go(func() error {
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
})
Expand All @@ -44,11 +52,10 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
// Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever
endBlockNumber := startBlockNumber - 1 + maxCount
i.log.Info("Starting ingester",
"max_batch_size", i.cfg.MaxBatchSize,
"run_forever", maxCount <= 0,
"start_block_number", startBlockNumber,
"end_block_number", endBlockNumber,
"batch_size", i.cfg.MaxBatchSize,
"runForever", maxCount <= 0,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxConcurrency", i.cfg.MaxConcurrentRequests,
)

// Produce block numbers in the main goroutine
Expand Down Expand Up @@ -136,87 +143,161 @@ func (i *ingester) FetchBlockLoop(
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
Timestamp: time.Now(),
BlockNumbers: fmt.Sprintf("%d", blockNumber),
Error: err,
})

// TODO: should we sleep (backoff) here?
continue
// We need to send an empty block downstream to indicate that this failed
if i.cfg.SkipFailedBlocks {
blocks <- block
continue
}
return err
}

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)
i.log.Info("FetchBlockLoop: Got block by number", "blockNumber", blockNumber, "elapsed", getBlockElapsed)
startTime = time.Now()
select {
case <-ctx.Done():
i.log.Info("FetchBlockLoop: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("FetchBlockLoop: Sent block", "blockNumber", blockNumber, "elapsed", time.Since(startTime))
i.log.Info(
"FetchBlockLoop: Got and sent block",
"blockNumber", blockNumber,
"getBlockElapsed", getBlockElapsed,
"sendBlockElapsed", time.Since(startTime),
)
}
}
}
}

// SendBlocks to Dune. We receive blocks from the FetchBlockLoop goroutines, potentially out of order.
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock, startBlockNumber int64) error {
i.log.Info("SendBlocks: Starting to receive blocks")
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error {
// Buffer for temporarily storing blocks that have arrived out of order
collectedBlocks := make(map[int64]models.RPCBlock)
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()

i.log.Info("SendBlocks: Starting to receive blocks")
for {
// Either receive a block, send blocks, or shut down (if the context is done, or the channel is closed).
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
return ctx.Err()
case block, ok := <-blocksCh:
case block, ok := <-blocks:
if !ok {
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blocks[block.BlockNumber] = block
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks))

nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
// We got an empty block from the RPC client goroutine, skip to proceed with the next block
if block.Empty() {
if i.cfg.SkipFailedBlocks {
i.log.Info("Skipping empty block", "nextNumberToSend", nextNumberToSend)
nextNumberToSend++
continue
}
i.log.Info("Received empty block, exiting", "nextNumberToSend", nextNumberToSend)
return errors.Errorf("empty block received")
}
collectedBlocks[block.BlockNumber] = block
i.log.Info(
"SendBlocks: Received block",
"blockNumber", block.BlockNumber,
"bufferSize", len(collectedBlocks),
)
case <-batchTimer.C:
var err error
nextNumberToSend, err = i.trySendCompletedBlocks(ctx, collectedBlocks, nextNumberToSend)
if err != nil {
return errors.Errorf("send blocks: %w", err)
}
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}

const maxBatchSize = 100

// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap.
// Once we have sent all blocks, if any, we return with the nextNumberToSend.
// We return the next numberToSend such that the caller can continue from there.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
blocks map[int64]models.RPCBlock,
collectedBlocks map[int64]models.RPCBlock,
nextNumberToSend int64,
) int64 {
// Send this block only if we have sent all previous blocks
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlocks(ctx, []models.RPCBlock{block}); err != nil {
) (int64, error) {
// Outer loop: We might need to send multiple batch requests if our buffer is too big
for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] {
// Collect a blocks of blocks to send, only send those which are in order
blocks := make([]models.RPCBlock, 0, len(collectedBlocks))
for block, ok := collectedBlocks[nextNumberToSend]; ok; block, ok = collectedBlocks[nextNumberToSend] {
blocks = append(blocks, block)
delete(collectedBlocks, nextNumberToSend)
nextNumberToSend++
// Don't send more than maxBatchSize blocks
if len(blocks) == maxBatchSize {
break
}
}

if len(blocks) == 0 {
return nextNumberToSend, nil
}

// Send the batch
lastBlockNumber := blocks[len(blocks)-1].BlockNumber
if lastBlockNumber != nextNumberToSend-1 {
panic("unexpected last block number")
}
if err := i.dune.SendBlocks(ctx, blocks); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
return nextNumberToSend, nil
}
if !i.cfg.SkipFailedBlocks {
err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextNumberToSend, err
}
if i.cfg.SkipFailedBlocks {
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error(
"SendBlocks: Failed to send batch, continuing",
"blockNumberFirst", blocks[0].BlockNumber,
"blockNumberLast", blocks[len(blocks)-1].BlockNumber,
"error", err,
)
blockNumbers := make([]string, len(blocks))
for i, block := range blocks {
blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumbers: strings.Join(blockNumbers, ","),
Error: err,
})
continue
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
i.log.Info(
"SendBlocks: Sent batch, updating latest ingested block number",
"blockNumberFirst", blocks[0].BlockNumber,
"blockNumberLast", lastBlockNumber,
"batchSize", len(blocks),
)

atomic.StoreInt64(&i.info.IngestedBlockNumber, lastBlockNumber)
}
// We've sent block N, so increment the pointer
delete(blocks, nextNumberToSend)
nextNumberToSend++
}
return nextNumberToSend

return nextNumberToSend, nil
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
Expand Down
Loading

0 comments on commit f2fb85f

Please sign in to comment.