-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send batch of blocks from the main loop #37
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
bfb9cb2
to
d17408e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code is super clean & nice.
I have nitpicks on the log lines and how to name the env/config variable for defining the "block submit interval".
I also think we need to have a test that shows we sending a batched request instead of a single block..
3c2104d
to
75cb211
Compare
6399e35
to
f0e0be9
Compare
if resp == nil { | ||
log.Warn("Retrying request", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw a panic once on this, so safe guarding against that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2e022fa
to
8d7e472
Compare
db35c5d
to
f00a5d4
Compare
ingester/mainloop.go
Outdated
if !i.cfg.SkipFailedBlocks { | ||
return err | ||
} | ||
// We need to send an empty block downstream to indicate that this failed | ||
blocks <- models.RPCBlock{BlockNumber: blockNumber} | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we need to handle the case of failing to get a block from the RPC node. If we just proceed without sending on the channel we will never proceed, since the SendBlocks channel relies on the block numbers. So we need to pass it downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, we need to change the channels to be "blockRPCResp" and that can be RPCBlock OR an error + blockNumber
f00a5d4
to
11d3299
Compare
ingester/mainloop.go
Outdated
if block.Empty() { | ||
// We got an empty block from the RPC client goroutine, either fail or send an empty block downstream | ||
if !i.cfg.SkipFailedBlocks { | ||
i.log.Info("Received empty block, exiting", "number", block.BlockNumber) | ||
return errors.Errorf("empty block received") | ||
} | ||
i.log.Info("Received empty block", "number", block.BlockNumber) | ||
} | ||
|
||
nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) | ||
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) | ||
collectedBlocks[block.BlockNumber] = block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized we need to handle the case of failing to get a block from the RPC node. If we just proceed without sending on the channel we will never proceed, since the SendBlocks channel relies on the block numbers. So we need to pass it downstream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, why don't we have:
if nextNumberToSend == block.Number && block.Empty() {
log.WARN...
if !skip { return .. }
nextNumberToSend +=1 // skip it
}
.. normal logic here, the trySendBlocks will skip that block..
ingester/mainloop.go
Outdated
// Skip block if it's empty and we're configured to skip empty blocks | ||
if i.cfg.SkipFailedBlocks && block.Empty() { | ||
nextNumberToSend++ | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only here do we properly skip the empty block
11d3299
to
3a457e0
Compare
3a457e0
to
6730e76
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approving with comments. :-)
ingester/mainloop.go
Outdated
if !i.cfg.SkipFailedBlocks { | ||
return err | ||
} | ||
// We need to send an empty block downstream to indicate that this failed | ||
blocks <- models.RPCBlock{BlockNumber: blockNumber} | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, we need to change the channels to be "blockRPCResp" and that can be RPCBlock OR an error + blockNumber
select { | ||
case <-ctx.Done(): | ||
i.log.Info("SendBlocks: Context canceled, stopping") | ||
return ctx.Err() | ||
case block, ok := <-blocksCh: | ||
case block, ok := <-blocks: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
liked the old name more :)
ingester/mainloop.go
Outdated
if block.Empty() { | ||
// We got an empty block from the RPC client goroutine, either fail or send an empty block downstream | ||
if !i.cfg.SkipFailedBlocks { | ||
i.log.Info("Received empty block, exiting", "number", block.BlockNumber) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate log line, move it above the if
ingester/mainloop.go
Outdated
if block.Empty() { | ||
// We got an empty block from the RPC client goroutine, either fail or send an empty block downstream | ||
if !i.cfg.SkipFailedBlocks { | ||
i.log.Info("Received empty block, exiting", "number", block.BlockNumber) | ||
return errors.Errorf("empty block received") | ||
} | ||
i.log.Info("Received empty block", "number", block.BlockNumber) | ||
} | ||
|
||
nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend) | ||
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend) | ||
collectedBlocks[block.BlockNumber] = block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, why don't we have:
if nextNumberToSend == block.Number && block.Empty() {
log.WARN...
if !skip { return .. }
nextNumberToSend +=1 // skip it
}
.. normal logic here, the trySendBlocks will skip that block..
ingester/mainloop.go
Outdated
// Outer loop: We might need to send multiple batch requests if our buffer is too big | ||
for _, ok := collectedBlocks[nextNumberToSend]; ok; _, ok = collectedBlocks[nextNumberToSend] { | ||
// Collect a blocks of blocks to send, only send those which are in order | ||
blocks := make([]models.RPCBlock, 0, len(collectedBlocks)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocks := make([]models.RPCBlock, 0, len(collectedBlocks)) | |
blocks := make([]models.RPCBlock, 0, maxBatchSize) |
ingester/mainloop_test.go
Outdated
for _, block := range blocks { | ||
// We cannot send blocks out of order to DuneAPI | ||
if block.BlockNumber != next { | ||
panic(fmt.Sprintf("expected block %d, got %d", next, block.BlockNumber)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use require.Equal(t, blockNumber, next)
|
||
// Test must fail if DuneAPI receives blocks out of order | ||
require.Equal(t, block.BlockNumber, sentBlockNumber+1) | ||
// Fail if we're not sending a batch of blocks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is function screaming to get out.
(aka, this helper function is useful in many tests where you wanna validate what we pass to SendBlocks()
require.ErrorIs(t, err, someRPCError) | ||
} | ||
|
||
// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outdated comment
require.GreaterOrEqual(t, sentBlockNumber, maxBlockNumber) | ||
} | ||
|
||
// TestRunRPCNodeFails shows that we crash if the RPC client fails to fetch a block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is showing we fail if DuneAPI.SendBlocks()
fails right? so we shouldchange the name and comment?
@@ -5,3 +5,7 @@ type RPCBlock struct { | |||
// agnostic blob of data that is the block | |||
Payload []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest we add a Error field and use Errored() bool
instead of Empty()
This PR changes the node indexer from sending one block at a time to sending a batch of blocks. Earlier we implemented concurrent block fetching with buffering (#32). On a configurable interval (defaults to every second), we now check the buffer and send all possible blocks.
We add a flag for specifying whether you wish to skip a failed block or not. It's off by default. This means if all the retries to the RPC node fails for a given block, we will crash. This ensures no block gaps.