Skip to content

Commit

Permalink
Remove optimization of not sending small batches (#50)
Browse files Browse the repository at this point in the history
Conduit reported "high latency". It _might_ be due to this.

We're currently not sending batches of <10 blocks (1/10th of the max
batch size of 100), to optimize throughput. When the indexer is caught
up, this leads to (imo) unnecessarily high latency, as BOB produces
blocks every 300ms and we'll thus only send a request every 3 seconds.
We should instead be sending as often as possible, or at least the
default of every half a second.

One option is to have some sort of "am I backfilling or not" flag. I
suggest just removing this optimization for now, and rather revisit this
if/when we see that the next time we're backfilling, we're unnecessarily
sending many small batches.

I'm fairly convinced that when backfilling, this optimization ~never
triggers anyway, since the requests to the node should be filling up the
buffer of blocks to send.
  • Loading branch information
vegarsti authored Jul 1, 2024
1 parent 71d4bad commit 82dfb39
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 74 deletions.
70 changes: 0 additions & 70 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,76 +190,6 @@ func TestSendBlocks(t *testing.T) {
require.Equal(t, int64(10), sentBlockNumber)
}

func TestRunBlocksUseBatching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlocksFunc: func(_ context.Context, blocks []models.RPCBlock) error {
if len(blocks) == 0 {
return nil
}

// Fail if we're not sending a batch of blocks
require.Greater(t, len(blocks), 1)

next := sentBlockNumber + 1
for _, block := range blocks {
// We cannot send blocks out of order to DuneAPI
require.Equalf(t, next, block.BlockNumber, "expected block %d, got %d", next, block.BlockNumber)
next++
}

lastBlockNumber := blocks[len(blocks)-1].BlockNumber
atomic.StoreInt64(&sentBlockNumber, lastBlockNumber)
if lastBlockNumber >= maxBlockNumber {
// cancel execution when we have sent the last block
cancel()
return context.Canceled
}

return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return maxBlockNumber + 1, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
// Get blocks out of order by sleeping for a random amount of time
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{BlockNumber: blockNumber, Payload: []byte("block")}, nil
},
CloseFunc: func() error {
return nil
},
}
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(
slog.New(slog.NewTextHandler(logOutput, nil)),
rpcClient,
duneapi,
ingester.Config{
MaxConcurrentRequests: 20, // fetch blocks in multiple goroutines
// big enough compared to the time spent in block by number to ensure batching. We panic
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
BlockSubmitInterval: 50 * time.Millisecond,
SkipFailedBlocks: false,
},
nil, // progress
)

err := ing.Run(ctx, 1, -1) // run until canceled
require.ErrorIs(t, err, context.Canceled) // this is expected
require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber)
}

// TestRunBlocksOutOfOrder asserts that we can fetch blocks concurrently and that we ingest them in order
// even if they are produced out of order. We ensure they are produced out of order by sleeping a random amount of time.
func TestRunBlocksOutOfOrder(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ func (i *ingester) trySendCompletedBlocks(
nextBlockToSend int64,
) (int64, error) {
for {
if len(collectedBlocks) < maxBatchSize/10 {
// if we have very little to send, wait for next tick to avoid tiny batches impacting throughput
return nextBlockToSend, nil
}
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
Expand Down

0 comments on commit 82dfb39

Please sign in to comment.