Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove genserver behavior from pending blocks #1141

Merged
merged 46 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
680447d
remove pending blocks state. It compiles, but get_blocks_with_status …
Arkenan Jun 3, 2024
e34441e
fix dialyzer
Arkenan Jun 3, 2024
0a38ad3
Merge remote-tracking branch 'origin/main' into replace_pending_block…
Arkenan Jun 3, 2024
4e2a163
readd underscore
Arkenan Jun 3, 2024
6f094b3
add status lists
Arkenan Jun 3, 2024
7d261c7
only use blocks wrappers
Arkenan Jun 4, 2024
9c97f55
Merge remote-tracking branch 'origin/main' into replace_pending_block…
Arkenan Jun 4, 2024
71100ab
Add tests and fix the cache to enable block updates
Arkenan Jun 4, 2024
a15f343
simplify pending blocks code for add_block
Arkenan Jun 4, 2024
f17486d
Merge remote-tracking branch 'origin/main' into replace_pending_block…
Arkenan Jun 4, 2024
5f486ea
fix fork choice call
Arkenan Jun 4, 2024
a98d289
remove unused alias
Arkenan Jun 4, 2024
e6afeec
add cases for the get_blocks_with_status calls
Arkenan Jun 4, 2024
02de936
take blockinfo out. Make the put a call in LRU cache. Fix nil/downloa…
Arkenan Jun 4, 2024
862d4d8
add block info
Arkenan Jun 4, 2024
3d808ef
fix lint
Arkenan Jun 4, 2024
1f5226d
Merge remote-tracking branch 'origin/main' into replace_pending_block…
Arkenan Jun 4, 2024
6dc0b04
change handle_info to handle_casts
Arkenan Jun 5, 2024
6eefe07
Make LRU cache writes to be calls
Arkenan Jun 6, 2024
4c5498a
Remove io inspects
Arkenan Jun 6, 2024
db68778
Merge branch 'main' into replace_pending_blocks_state_with_db
Arkenan Jun 6, 2024
41618b9
adding some logs to debug ci
Arkenan Jun 6, 2024
3297363
Merge branch 'main' into replace_pending_blocks_state_with_db
Arkenan Jun 10, 2024
efcd70d
Everything compiling
Arkenan Jun 11, 2024
9c972b7
Merge remote-tracking branch 'origin/main' into remove_genserver_beha…
Arkenan Jun 11, 2024
d9b300d
progress for friday
Arkenan Jun 14, 2024
fd1ed80
compile without warnings
Arkenan Jun 17, 2024
5ac39ee
add sync block download. fix lint dialyzer and sync_blocks.
Arkenan Jun 17, 2024
7d65755
fix tracer call, state return, send_request command, add some flatmaps
Arkenan Jun 17, 2024
a49e5da
add request id in go response
Arkenan Jun 17, 2024
e289b6c
Merge remote-tracking branch 'origin/main' into remove_genserver_beha…
Arkenan Jun 25, 2024
f71bd46
fix download blob recursive calls
Arkenan Jun 25, 2024
8c37cf5
fix sync requests
Arkenan Jun 25, 2024
e06036f
improve logs
Arkenan Jun 27, 2024
fb789d4
blocks move through libp2p port
Arkenan Jul 1, 2024
bd29ee6
remove recursion from process_blocks, leave a single call in add_bloc…
Arkenan Jul 1, 2024
706575a
merge with main
Arkenan Jul 2, 2024
71c3a89
fix lint
Arkenan Jul 2, 2024
12ff58e
fix alias in clock
Arkenan Jul 2, 2024
af69283
skip pending blocks until we re-add downloading
Arkenan Jul 2, 2024
17bfef2
remove beacon chain again
Arkenan Jul 2, 2024
6eb8e61
only call process_blocks when needed. Add logs.
Arkenan Jul 2, 2024
30b9fe3
fix linter
Arkenan Jul 2, 2024
006fd93
Added more retries for blob downloads. Moved the blobs handler to pri…
Arkenan Jul 2, 2024
fb61a14
remove IO.puts
Arkenan Jul 2, 2024
305b1ab
Update test/unit/p2p/requests_test.exs
Arkenan Jul 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,37 @@ Asynchronously, a new task is started to recompute the new head, as this takes a

## Request-Response

**TO DO**: document how ports work for this.
Request-response is an on-demand protocol where a node asks for information directly to a peer and expects a response. This may be to request metadata that corresponds to that peer for discovery purposes, or to request information from the past that will not appear on when listening to gossip (useful for checkpoint sync).

It's implemented in the following way:

```mermaid
sequenceDiagram
participant req as Requesting Process
participant p2p as Libp2pPort
participant gomain as go libp2p main
participant goreq as request goroutine
req ->> req: send_request(peer_id, protocol_id, message)
req ->> p2p: send_protobuf(from: self())
activate p2p
p2p ->> gomain: %Command{}
deactivate p2p
req ->>+ req: receive_response()
gomain ->> gomain: handle_command()
gomain ->>+ goreq: go sendAsyncRequest()
goreq ->>- p2p: SendNotification(%Result{from, response, err})
p2p ->>p2p: handle_notification(%Result{from: from})
p2p ->> req: {:response, result}
deactivate req
```

Explained, a process that wants to request something from Libp2pPort sends a request with its own pid, which is then included in the Command payload. The request is handled asynchronously in the go side, and eventually, the pid is included in the response, and sent back to LibP2PPort, who now knows to which process it needs to be dispatched.

The specific kind of command (a request) is specified, but there's nothing identifying this is a response vs any other kind of result, or the specific kind of response (e.g. a block download vs a blob download). Currently the only way this is handled differentially is because the pid is waiting for a specific kind of response and for nothing else at a time.

### Checkpoint sync

Expand Down
1 change: 0 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
{LambdaEthereumConsensus.Beacon.Clock, {store.genesis_time, time}},
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
LambdaEthereumConsensus.P2P.IncomingRequests,
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor},
Expand Down
4 changes: 2 additions & 2 deletions lib/lambda_ethereum_consensus/beacon/clock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule LambdaEthereumConsensus.Beacon.Clock do

use GenServer

alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.Validator.ValidatorManager

require Logger
Expand Down Expand Up @@ -50,7 +50,7 @@ defmodule LambdaEthereumConsensus.Beacon.Clock do
new_state = %{state | time: time}

if time >= state.genesis_time do
PendingBlocks.on_tick(time)
Libp2pPort.on_tick(time)
# TODO: reduce time between ticks to account for gnosis' 5s slot time.
old_logical_time = compute_logical_time(state)
new_logical_time = compute_logical_time(new_state)
Expand Down
206 changes: 68 additions & 138 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
The main purpose of this module is making sure that a blocks parent is already in the fork choice. If it's not, it will request it to the block downloader.
"""

use GenServer
require Logger

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.BlobDownloader
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias Types.BlockInfo
Expand All @@ -22,137 +19,39 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
| {nil, :invalid | :download}
@type state :: nil

##########################
### Public API
##########################
@doc """
If the block is not present, it will be stored as pending.
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
In case it's ready to be processed
(the parent is present and already transitioned), then the block's state transition will be
calculated, resulting in a new saved block.
If the new state enables older blocks that were pending to be processed, they will be processed
immediately.
If blobs are missing, they will be requested.
"""
@spec add_block(SignedBeaconBlock.t()) :: :ok
def add_block(signed_block) do
GenServer.cast(__MODULE__, {:add_block, signed_block})
end

@spec on_tick(Types.uint64()) :: :ok
def on_tick(time) do
GenServer.cast(__MODULE__, {:on_tick, time})
end

##########################
### GenServer Callbacks
##########################

@impl true
@spec init(any) :: {:ok, state()}
def init(_opts) do
schedule_blocks_processing()
schedule_blocks_download()
schedule_blobs_download()
block_info = BlockInfo.from_block(signed_block)

{:ok, nil}
end
# If the block is new or was to be downloaded, we store it.
loaded_block = Blocks.get_block_info(block_info.root)

@spec handle_cast(any(), state()) :: {:noreply, state()}
if is_nil(loaded_block) or loaded_block.status == :download do
missing_blobs = missing_blobs(block_info)

@impl true
def handle_cast({:add_block, %SignedBeaconBlock{} = signed_block}, _state) do
block_info = BlockInfo.from_block(signed_block)
if Enum.empty?(missing_blobs) do
Blocks.new_block_info(block_info)
process_block_and_check_children(block_info)
else
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, 30)

# If already processing or processed, ignore it
if not Blocks.has_block?(block_info.root) do
if Enum.empty?(missing_blobs(block_info)) do
block_info
else
block_info |> BlockInfo.change_status(:download_blobs)
|> BlockInfo.change_status(:download_blobs)
|> Blocks.new_block_info()
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
end
|> Blocks.new_block_info()
end

{:noreply, nil}
end

@impl true
def handle_cast({:on_tick, time}, state) do
ForkChoice.on_tick(time)
{:noreply, state}
end

@doc """
Iterates through the pending blocks and adds them to the fork choice if their parent is already in the fork choice.
"""
@impl true
@spec handle_info(atom(), state()) :: {:noreply, state()}
def handle_info(:process_blocks, _state) do
schedule_blocks_processing()
process_blocks()
{:noreply, nil}
end

@impl true
def handle_info(:download_blocks, _state) do
case Blocks.get_blocks_with_status(:download) do
{:ok, blocks_to_download} ->
blocks_to_download
|> Enum.take(16)
|> Enum.map(& &1.root)
|> BlockDownloader.request_blocks_by_root()
|> case do
{:ok, signed_blocks} ->
signed_blocks

{:error, reason} ->
Logger.debug("Block download failed: '#{reason}'")
[]
end
|> Enum.each(fn signed_block ->
signed_block
|> BlockInfo.from_block(:download)
|> Blocks.change_status(:download_blobs)
end)

{:error, reason} ->
Logger.error("[PendingBlocks] Failed to get blocks to download. Reason: #{reason}")
end

schedule_blocks_download()
{:noreply, nil}
end

@impl true
def handle_info(:download_blobs, _state) do
case Blocks.get_blocks_with_status(:download_blobs) do
{:ok, blocks_with_missing_blobs} ->
blocks_with_blobs =
blocks_with_missing_blobs
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
|> Enum.take(16)

blobs_to_download = Enum.flat_map(blocks_with_blobs, &missing_blobs/1)

downloaded_blobs =
blobs_to_download
|> BlobDownloader.request_blobs_by_root()
|> case do
{:ok, blobs} ->
blobs

{:error, reason} ->
Logger.debug("Blob download failed: '#{reason}'")
[]
end

Enum.each(downloaded_blobs, &BlobDb.store_blob/1)

# TODO: is it not possible that blobs were downloaded for one and not for another?
if length(downloaded_blobs) == length(blobs_to_download) do
Enum.each(blocks_with_blobs, &Blocks.change_status(&1, :pending))
end
end

schedule_blobs_download()
{:noreply, nil}
end

##########################
Expand All @@ -173,23 +72,37 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end
end

# Processes a block. If it was transitioned or declared invalid, then process_blocks
# is called to check if there's any children that can now be processed. This function
# is only to be called when a new block is saved as pending, not when processing blocks
# in batch, to avoid unneeded recursion.
defp process_block_and_check_children(block_info) do
if process_block(block_info) in [:transitioned, :invalid] do
process_blocks()
end
end

defp process_block(block_info) do
if block_info.status != :pending do
Logger.error("Called process block for a block that's not ready: #{block_info}")
end

parent_root = block_info.signed_block.message.parent_root

case Blocks.get_block_info(parent_root) do
nil ->
Blocks.add_block_to_download(parent_root)
:download_pending

%BlockInfo{status: :invalid} ->
Blocks.change_status(block_info, :invalid)
:invalid

%BlockInfo{status: :transitioned} ->
case ForkChoice.on_block(block_info) do
:ok ->
Blocks.change_status(block_info, :transitioned)

# Block is valid. We immediately check if we can process another block.
# process_blocks()
:transitioned

{:error, reason} ->
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
Expand All @@ -198,13 +111,42 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
)

Blocks.change_status(block_info, :invalid)
:invalid
end

_other ->
:ok
end
end

defp process_blobs({:ok, blobs}), do: add_blobs(blobs)

defp process_blobs({:error, reason}) do
Logger.error("Error downloading blobs: #{inspect(reason)}")

# We might want to declare a block invalid here.
end

# To be used when a series of blobs are downloaded. Stores each blob.
# If there are blocks that can be processed, does so immediately.
defp add_blobs(blobs) do
Enum.map(blobs, fn blob ->
BlobDb.store_blob(blob)
Ssz.hash_tree_root!(blob.signed_block_header.message)
Arkenan marked this conversation as resolved.
Show resolved Hide resolved
end)
|> Enum.uniq()
|> Enum.each(fn root ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do
# TODO: add a new missing blobs call if some blobs are still missing for a block.
if Enum.empty?(missing_blobs(block_info)) do
block_info
|> Blocks.change_status(:pending)
|> process_block_and_check_children()
end
end
end)
end

@spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
defp missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
signed_block.message.body.blob_kzg_commitments
Expand All @@ -222,16 +164,4 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
true
end
end

defp schedule_blocks_processing() do
Process.send_after(__MODULE__, :process_blocks, 500)
end

defp schedule_blobs_download() do
Process.send_after(__MODULE__, :download_blobs, 500)
end

defp schedule_blocks_download() do
Process.send_after(__MODULE__, :download_blocks, 1000)
end
end
Loading
Loading