Skip to content

Commit

Permalink
fix: each block processed added a :process_blocks (#1011)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaRedHand authored Apr 22, 2024
1 parent b87159e commit 391c4cc
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
@impl true
def handle_cast({:block_processed, block_root, true}, state) do
# Block is valid. We immediately check if we can process another block.
state |> Map.delete(block_root) |> then(&handle_info(:process_blocks, &1))
new_state = state |> Map.delete(block_root) |> process_blocks()
{:noreply, new_state}
end

@impl true
Expand All @@ -80,45 +81,14 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
{:noreply, state |> Map.put(block_root, {nil, :invalid})}
end

@spec handle_info(any(), state()) :: {:noreply, state()}

@doc """
Iterates through the pending blocks and adds them to the fork choice if their parent is already in the fork choice.
"""
@impl true
@spec handle_info(atom(), state()) :: {:noreply, state()}
def handle_info(:process_blocks, state) do
state
|> Enum.filter(fn {_, {_, s}} -> s == :pending end)
|> Enum.map(fn {root, {block, _}} -> {root, block} end)
|> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end)
|> Enum.reduce(state, fn {block_root, signed_block}, state ->
parent_root = signed_block.message.parent_root
parent_status = get_block_status(state, parent_root)

cond do
# If parent is invalid, block is invalid
parent_status == :invalid ->
state |> Map.put(block_root, {nil, :invalid})

# If parent isn't processed, block is pending
parent_status in [:processing, :pending, :download, :download_blobs] ->
state

# If parent is not in fork choice, download parent
not Blocks.has_block?(parent_root) ->
state |> Map.put(parent_root, {nil, :download})

# If all the other conditions are false, add block to fork choice
true ->
ForkChoice.on_block(signed_block, block_root)
state |> Map.put(block_root, {signed_block, :processing})
end
end)
|> then(fn state ->
schedule_blocks_processing()
{:noreply, state}
end)
schedule_blocks_processing()
{:noreply, process_blocks(state)}
end

@impl true
Expand Down Expand Up @@ -191,6 +161,36 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
### Private Functions
##########################

defp process_blocks(state) do
state
|> Enum.filter(fn {_, {_, s}} -> s == :pending end)
|> Enum.map(fn {root, {block, _}} -> {root, block} end)
|> Enum.sort_by(fn {_, signed_block} -> signed_block.message.slot end)
|> Enum.reduce(state, fn {block_root, signed_block}, state ->
parent_root = signed_block.message.parent_root
parent_status = get_block_status(state, parent_root)

cond do
# If parent is invalid, block is invalid
parent_status == :invalid ->
state |> Map.put(block_root, {nil, :invalid})

# If parent isn't processed, block is pending
parent_status in [:processing, :pending, :download, :download_blobs] ->
state

# If parent is not in fork choice, download parent
not Blocks.has_block?(parent_root) ->
state |> Map.put(parent_root, {nil, :download})

# If all the other conditions are false, add block to fork choice
true ->
ForkChoice.on_block(signed_block, block_root)
state |> Map.put(block_root, {signed_block, :processing})
end
end)
end

@spec get_block_status(state(), Types.root()) :: block_status()
defp get_block_status(state, block_root) do
state |> Map.get(block_root, {nil, :unknown}) |> elem(1)
Expand All @@ -216,7 +216,7 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
end

def schedule_blocks_processing do
Process.send_after(__MODULE__, :process_blocks, 3000)
Process.send_after(__MODULE__, :process_blocks, 500)
end

def schedule_blobs_download do
Expand Down

0 comments on commit 391c4cc

Please sign in to comment.