Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester main loop #5

Merged
merged 2 commits into from
Jun 6, 2024
Merged

Ingester main loop #5

merged 2 commits into from
Jun 6, 2024

Conversation

msf
Copy link
Contributor

@msf msf commented Jun 5, 2024

This is the main loop logic, it connects the:

  • rpc node client that is getting blocks
  • dune api client, that is sending blocks to dune.

it has a few responsabilities:

  • track errors on either side (and later do something about it)
  • track if it is behind or up to date
  • try to go fast.
  • produce good logs of whats going on

@msf msf requested review from vegarsti and helanto June 5, 2024 09:22
@msf msf force-pushed the step-3-main-control branch from c14985e to 466d500 Compare June 5, 2024 11:31
Copy link
Member

@vegarsti vegarsti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!!

ingester/ingester.go Show resolved Hide resolved
case <-ctx.Done():
return ctx.Err()
case payload, ok := <-blocksCh:
// TODO: we should batch RCP blocks here before sending to Dune.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like this?

	// Every N seconds: Send the batched blocks
	buffer := make([]*Block, 0, concurrency)
	ticker := time.NewTicker(batchInterval)
	for {
		select {
		case block := <-blocks:
			buffer = append(buffer, block)
			if len(buffer) == cap(buffer) {
				sendBlocks(buffer)
				buffer = buffer[:0] // Clear the buffer after sending.
			}
		case <-ticker.C:
			if len(buffer) > 0 {
				sendBlocks(buffer)
				buffer = buffer[:0] // Clear the buffer after sending.
			}
		}
	}

Copy link
Contributor Author

@msf msf Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, but batchInterval should be like 150millis.. so we don't cause delays, its only for the case where we can harvest fast (or forces us to harvest fast?)

return latest
}

func (i *ingester) ReportProgress(ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running this in a separate goroutine is such a great idea!

@vegarsti
Copy link
Member

vegarsti commented Jun 5, 2024

For the main loop, when we start trying to do reliable delivery, I'm thinking we might want to use more of a pipeline with channels

  • one channel to send block numbers on. 2 goroutines to send block numbers into this channel: one for incrementing the next block number, another for checking Dune API for missing block numbers. Or it reads from the DLQ and sends those blocks on the channel
  • one channel to send blocks on. Many goroutines fetch blocks and send into this channel.
  • one goroutine reads blocks, buffers, and sends to the API

What do you think? Wrote a prototype here: https://gist.github.com/vegarsti/bc4d9dceec87666f456f3fbf6b213edb

Copy link
Contributor

@helanto helanto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, LGTM :)

return latestBlockNumber
}

for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future do we consider parallelizing this part ? I mean to consume blocks in parallel ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, like Vegard proposed above

@msf msf merged commit aac75cb into main Jun 6, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants