From 391c4cca5d891d9e0a33c5016f2c3d17b57847af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 22 Apr 2024 10:39:35 -0300 Subject: [PATCH] fix: each block processed added a `:process_blocks` (#1011) --- .../beacon/pending_blocks.ex | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index 6c968ea88..c0fb52ece 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -71,7 +71,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do @impl true def handle_cast({:block_processed, block_root, true}, state) do # Block is valid. We immediately check if we can process another block. - state |> Map.delete(block_root) |> then(&handle_info(:process_blocks, &1)) + new_state = state |> Map.delete(block_root) |> process_blocks() + {:noreply, new_state} end @impl true @@ -80,45 +81,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do {:noreply, state |> Map.put(block_root, {nil, :invalid})} end - @spec handle_info(any(), state()) :: {:noreply, state()} - @doc """ Iterates through the pending blocks and adds them to the fork choice if their parent is already in the fork choice. """ @impl true @spec handle_info(atom(), state()) :: {:noreply, state()} def handle_info(:process_blocks, state) do - state - |> Enum.filter(fn {_, {_, s}} -> s == :pending end) - |> Enum.map(fn {root, {block, _}} -> {root, block} end) - |> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end) - |> Enum.reduce(state, fn {block_root, signed_block}, state -> - parent_root = signed_block.message.parent_root - parent_status = get_block_status(state, parent_root) - - cond do - # If parent is invalid, block is invalid - parent_status == :invalid -> - state |> Map.put(block_root, {nil, :invalid}) - - # If parent isn't processed, block is pending - parent_status in [:processing, :pending, :download, :download_blobs] -> - state - - # If parent is not in fork choice, download parent - not Blocks.has_block?(parent_root) -> - state |> Map.put(parent_root, {nil, :download}) - - # If all the other conditions are false, add block to fork choice - true -> - ForkChoice.on_block(signed_block, block_root) - state |> Map.put(block_root, {signed_block, :processing}) - end - end) - |> then(fn state -> - schedule_blocks_processing() - {:noreply, state} - end) + schedule_blocks_processing() + {:noreply, process_blocks(state)} end @impl true @@ -191,6 +161,36 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do ### Private Functions ########################## + defp process_blocks(state) do + state + |> Enum.filter(fn {_, {_, s}} -> s == :pending end) + |> Enum.map(fn {root, {block, _}} -> {root, block} end) + |> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end) + |> Enum.reduce(state, fn {block_root, signed_block}, state -> + parent_root = signed_block.message.parent_root + parent_status = get_block_status(state, parent_root) + + cond do + # If parent is invalid, block is invalid + parent_status == :invalid -> + state |> Map.put(block_root, {nil, :invalid}) + + # If parent isn't processed, block is pending + parent_status in [:processing, :pending, :download, :download_blobs] -> + state + + # If parent is not in fork choice, download parent + not Blocks.has_block?(parent_root) -> + state |> Map.put(parent_root, {nil, :download}) + + # If all the other conditions are false, add block to fork choice + true -> + ForkChoice.on_block(signed_block, block_root) + state |> Map.put(block_root, {signed_block, :processing}) + end + end) + end + @spec get_block_status(state(), Types.root()) :: block_status() defp get_block_status(state, block_root) do state |> Map.get(block_root, {nil, :unknown}) |> elem(1) @@ -216,7 +216,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do end def schedule_blocks_processing do - Process.send_after(__MODULE__, :process_blocks, 3000) + Process.send_after(__MODULE__, :process_blocks, 500) end def schedule_blobs_download do