Skip to content

Commit

Permalink
refactor: remove genserver behavior from pending blocks (#1141)
Browse files Browse the repository at this point in the history
Co-authored-by: Avila Gastón <72628438+avilagaston9@users.noreply.github.com>
  • Loading branch information
Arkenan and avilagaston9 committed Jul 4, 2024
1 parent 2c9a578 commit 7b1f204
Show file tree
Hide file tree
Showing 24 changed files with 633 additions and 330 deletions.
32 changes: 31 additions & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,37 @@ Asynchronously, a new task is started to recompute the new head, as this takes a

## Request-Response

**TO DO**: document how ports work for this.
Request-response is an on-demand protocol where a node asks for information directly to a peer and expects a response. This may be to request metadata that corresponds to that peer for discovery purposes, or to request information from the past that will not appear on when listening to gossip (useful for checkpoint sync).

It's implemented in the following way:

```mermaid
sequenceDiagram
participant req as Requesting Process
participant p2p as Libp2pPort
participant gomain as go libp2p main
participant goreq as request goroutine
req ->> req: send_request(peer_id, protocol_id, message)
req ->> p2p: send_protobuf(from: self())
activate p2p
p2p ->> gomain: %Command{}
deactivate p2p
req ->>+ req: receive_response()
gomain ->> gomain: handle_command()
gomain ->>+ goreq: go sendAsyncRequest()
goreq ->>- p2p: SendNotification(%Result{from, response, err})
p2p ->>p2p: handle_notification(%Result{from: from})
p2p ->> req: {:response, result}
deactivate req
```

Explained, a process that wants to request something from Libp2pPort sends a request with its own pid, which is then included in the Command payload. The request is handled asynchronously in the go side, and eventually, the pid is included in the response, and sent back to LibP2PPort, who now knows to which process it needs to be dispatched.

The specific kind of command (a request) is specified, but there's nothing identifying this is a response vs any other kind of result, or the specific kind of response (e.g. a block download vs a blob download). Currently the only way this is handled differentially is because the pid is waiting for a specific kind of response and for nothing else at a time.

### Checkpoint sync

Expand Down
1 change: 0 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
[
{LambdaEthereumConsensus.Beacon.Clock, {store.genesis_time, time}},
{LambdaEthereumConsensus.Libp2pPort, libp2p_args},
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
{Task.Supervisor, name: PruneStatesSupervisor},
{Task.Supervisor, name: PruneBlocksSupervisor},
Expand Down
4 changes: 2 additions & 2 deletions lib/lambda_ethereum_consensus/beacon/clock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule LambdaEthereumConsensus.Beacon.Clock do

use GenServer

alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.Validator.ValidatorManager

require Logger
Expand Down Expand Up @@ -50,7 +50,7 @@ defmodule LambdaEthereumConsensus.Beacon.Clock do
new_state = %{state | time: time}

if time >= state.genesis_time do
PendingBlocks.on_tick(time)
Libp2pPort.on_tick(time)
# TODO: reduce time between ticks to account for gnosis' 5s slot time.
old_logical_time = compute_logical_time(state)
new_logical_time = compute_logical_time(new_state)
Expand Down
206 changes: 68 additions & 138 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
The main purpose of this module is making sure that a blocks parent is already in the fork choice. If it's not, it will request it to the block downloader.
"""

use GenServer
require Logger

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.BlobDownloader
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias Types.BlockInfo
Expand All @@ -22,137 +19,39 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
| {nil, :invalid | :download}
@type state :: nil

##########################
### Public API
##########################
@doc """
If the block is not present, it will be stored as pending.
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
In case it's ready to be processed
(the parent is present and already transitioned), then the block's state transition will be
calculated, resulting in a new saved block.
If the new state enables older blocks that were pending to be processed, they will be processed
immediately.
If blobs are missing, they will be requested.
"""
@spec add_block(SignedBeaconBlock.t()) :: :ok
def add_block(signed_block) do
GenServer.cast(__MODULE__, {:add_block, signed_block})
end

@spec on_tick(Types.uint64()) :: :ok
def on_tick(time) do
GenServer.cast(__MODULE__, {:on_tick, time})
end

##########################
### GenServer Callbacks
##########################

@impl true
@spec init(any) :: {:ok, state()}
def init(_opts) do
schedule_blocks_processing()
schedule_blocks_download()
schedule_blobs_download()
block_info = BlockInfo.from_block(signed_block)

{:ok, nil}
end
# If the block is new or was to be downloaded, we store it.
loaded_block = Blocks.get_block_info(block_info.root)

@spec handle_cast(any(), state()) :: {:noreply, state()}
if is_nil(loaded_block) or loaded_block.status == :download do
missing_blobs = missing_blobs(block_info)

@impl true
def handle_cast({:add_block, %SignedBeaconBlock{} = signed_block}, _state) do
block_info = BlockInfo.from_block(signed_block)
if Enum.empty?(missing_blobs) do
Blocks.new_block_info(block_info)
process_block_and_check_children(block_info)
else
BlobDownloader.request_blobs_by_root(missing_blobs, &process_blobs/1, 30)

# If already processing or processed, ignore it
if not Blocks.has_block?(block_info.root) do
if Enum.empty?(missing_blobs(block_info)) do
block_info
else
block_info |> BlockInfo.change_status(:download_blobs)
|> BlockInfo.change_status(:download_blobs)
|> Blocks.new_block_info()
end
|> Blocks.new_block_info()
end

{:noreply, nil}
end

@impl true
def handle_cast({:on_tick, time}, state) do
ForkChoice.on_tick(time)
{:noreply, state}
end

@doc """
Iterates through the pending blocks and adds them to the fork choice if their parent is already in the fork choice.
"""
@impl true
@spec handle_info(atom(), state()) :: {:noreply, state()}
def handle_info(:process_blocks, _state) do
schedule_blocks_processing()
process_blocks()
{:noreply, nil}
end

@impl true
def handle_info(:download_blocks, _state) do
case Blocks.get_blocks_with_status(:download) do
{:ok, blocks_to_download} ->
blocks_to_download
|> Enum.take(16)
|> Enum.map(& &1.root)
|> BlockDownloader.request_blocks_by_root()
|> case do
{:ok, signed_blocks} ->
signed_blocks

{:error, reason} ->
Logger.debug("Block download failed: '#{reason}'")
[]
end
|> Enum.each(fn signed_block ->
signed_block
|> BlockInfo.from_block(:download)
|> Blocks.change_status(:download_blobs)
end)

{:error, reason} ->
Logger.error("[PendingBlocks] Failed to get blocks to download. Reason: #{reason}")
end

schedule_blocks_download()
{:noreply, nil}
end

@impl true
def handle_info(:download_blobs, _state) do
case Blocks.get_blocks_with_status(:download_blobs) do
{:ok, blocks_with_missing_blobs} ->
blocks_with_blobs =
blocks_with_missing_blobs
|> Enum.sort_by(fn %BlockInfo{} = block_info -> block_info.signed_block.message.slot end)
|> Enum.take(16)

blobs_to_download = Enum.flat_map(blocks_with_blobs, &missing_blobs/1)

downloaded_blobs =
blobs_to_download
|> BlobDownloader.request_blobs_by_root()
|> case do
{:ok, blobs} ->
blobs

{:error, reason} ->
Logger.debug("Blob download failed: '#{reason}'")
[]
end

Enum.each(downloaded_blobs, &BlobDb.store_blob/1)

# TODO: is it not possible that blobs were downloaded for one and not for another?
if length(downloaded_blobs) == length(blobs_to_download) do
Enum.each(blocks_with_blobs, &Blocks.change_status(&1, :pending))
end
end

schedule_blobs_download()
{:noreply, nil}
end

##########################
Expand All @@ -173,23 +72,37 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end
end

# Processes a block. If it was transitioned or declared invalid, then process_blocks
# is called to check if there's any children that can now be processed. This function
# is only to be called when a new block is saved as pending, not when processing blocks
# in batch, to avoid unneeded recursion.
defp process_block_and_check_children(block_info) do
if process_block(block_info) in [:transitioned, :invalid] do
process_blocks()
end
end

defp process_block(block_info) do
if block_info.status != :pending do
Logger.error("Called process block for a block that's not ready: #{block_info}")
end

parent_root = block_info.signed_block.message.parent_root

case Blocks.get_block_info(parent_root) do
nil ->
Blocks.add_block_to_download(parent_root)
:download_pending

%BlockInfo{status: :invalid} ->
Blocks.change_status(block_info, :invalid)
:invalid

%BlockInfo{status: :transitioned} ->
case ForkChoice.on_block(block_info) do
:ok ->
Blocks.change_status(block_info, :transitioned)

# Block is valid. We immediately check if we can process another block.
# process_blocks()
:transitioned

{:error, reason} ->
Logger.error("[PendingBlocks] Saving block as invalid #{reason}",
Expand All @@ -198,13 +111,42 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
)

Blocks.change_status(block_info, :invalid)
:invalid
end

_other ->
:ok
end
end

defp process_blobs({:ok, blobs}), do: add_blobs(blobs)

defp process_blobs({:error, reason}) do
Logger.error("Error downloading blobs: #{inspect(reason)}")

# We might want to declare a block invalid here.
end

# To be used when a series of blobs are downloaded. Stores each blob.
# If there are blocks that can be processed, does so immediately.
defp add_blobs(blobs) do
Enum.map(blobs, fn blob ->
BlobDb.store_blob(blob)
Ssz.hash_tree_root!(blob.signed_block_header.message)
end)
|> Enum.uniq()
|> Enum.each(fn root ->
with %BlockInfo{} = block_info <- Blocks.get_block_info(root) do
# TODO: add a new missing blobs call if some blobs are still missing for a block.
if Enum.empty?(missing_blobs(block_info)) do
block_info
|> Blocks.change_status(:pending)
|> process_block_and_check_children()
end
end
end)
end

@spec missing_blobs(BlockInfo.t()) :: [Types.BlobIdentifier.t()]
defp missing_blobs(%BlockInfo{root: root, signed_block: signed_block}) do
signed_block.message.body.blob_kzg_commitments
Expand All @@ -222,16 +164,4 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
true
end
end

defp schedule_blocks_processing() do
Process.send_after(__MODULE__, :process_blocks, 500)
end

defp schedule_blobs_download() do
Process.send_after(__MODULE__, :download_blobs, 500)
end

defp schedule_blocks_download() do
Process.send_after(__MODULE__, :download_blocks, 1000)
end
end
Loading

0 comments on commit 7b1f204

Please sign in to comment.