Skip to content

Commit

Permalink
Refactor beacon block gossip to make it more flexible.
Browse files Browse the repository at this point in the history
  • Loading branch information
mpaulucci committed Apr 23, 2024
1 parent d2728c3 commit 44f0fbc
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 8 deletions.
17 changes: 13 additions & 4 deletions lib/lambda_ethereum_consensus/beacon/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
use GenServer

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.Gossip
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Validator
alias Types.BeaconState
Expand Down Expand Up @@ -176,8 +177,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
new_state = %BeaconChainState{state | time: time}
new_logical_time = compute_logical_time(new_state)

if state.synced and old_logical_time != new_logical_time do
notify_subscribers(new_logical_time)
if old_logical_time != new_logical_time do
notify_subscribers(new_logical_time, state.synced)
end

{:noreply, new_state}
Expand Down Expand Up @@ -238,9 +239,17 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
{slot, slot_third}
end

defp notify_subscribers(logical_time) do
defp notify_subscribers(logical_time, synced) do
log_new_slot(logical_time)
Validator.notify_tick(logical_time)

if synced do
Validator.notify_tick(logical_time)
end

case logical_time do
{slot, :first_third} -> Gossip.BeaconBlock.notify_slot(slot)
_ -> :ok
end
end

defp log_new_slot({slot, :first_third}) do
Expand Down
3 changes: 2 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
LambdaEthereumConsensus.P2P.GossipSub,
LambdaEthereumConsensus.P2P.Gossip.Attestation
LambdaEthereumConsensus.P2P.Gossip.Attestation,
LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
] ++ validator_children

Supervisor.init(children, strategy: :one_for_all)
Expand Down
2 changes: 2 additions & 0 deletions lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.P2P.Gossip
alias LambdaEthereumConsensus.StateTransition.Misc
alias Types.SignedBeaconBlock

Expand Down Expand Up @@ -72,6 +73,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do

if Enum.empty?(chunks) do
Logger.info("[Optimistic Sync] Sync completed")
Gossip.BeaconBlock.start()
else
Process.sleep(1000)
perform_sync(remaining_chunks)
Expand Down
86 changes: 86 additions & 0 deletions lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
@moduledoc """
This module handles beacon block gossipsub topics.
"""
alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.Libp2pPort
alias Types.SignedBeaconBlock

use GenServer

require Logger

@type state :: %{topic: String.t(), slot: Types.slot()}

##########################
### Public API
##########################

def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start() do
GenServer.call(__MODULE__, :start)
end

@spec notify_slot(Types.slot()) :: :ok
def notify_slot(slot) do
GenServer.cast(__MODULE__, {:slot_transition, slot})
end

##########################
### GenServer Callbacks
##########################

@impl true
@spec init(any()) :: {:ok, state()} | {:stop, any()}
def init(_init_arg) do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
slot = BeaconChain.get_current_slot()
topic_name = "/eth2/#{fork_context}/beacon_block/ssz_snappy"
Libp2pPort.join_topic(topic_name)
{:ok, %{topic: topic_name, slot: slot}}
end

@impl true
def handle_call(:start, _from, %{topic: topic_name} = state) do
Libp2pPort.subscribe_to_topic(topic_name)
{:reply, :ok, state}
end

@impl true
def handle_cast({:slot_transition, slot}, state) do
{:noreply, state |> Map.put(:slot, slot)}
end

@impl true
def handle_info({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do
with {:ok, uncompressed} <- :snappyer.decompress(message),
{:ok, beacon_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock),
:ok <- handle_beacon_block(beacon_block, slot) do
# TODO: validate before accepting
Libp2pPort.validate_message(msg_id, :accept)
else
{:error, _} -> Libp2pPort.validate_message(msg_id, :reject)
end

{:noreply, state}
end

@spec handle_beacon_block(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any}
defp handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block, current_slot) do
# TODO: reject blocks from the future
if block.slot > current_slot - ChainSpec.get("SLOTS_PER_EPOCH") do
Logger.info("[Gossip] Block received", slot: block.slot)

PendingBlocks.add_block(signed_block)
:ok
else
Logger.warning("[Gossip] Block with slot #{block.slot} is too old", slot: current_slot)
{:error, :block_too_old}
end
end
end
2 changes: 0 additions & 2 deletions lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do
alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.P2P.Gossip.Consumer
alias LambdaEthereumConsensus.P2P.Gossip.Handler
alias Types.SignedBeaconBlock

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
Expand All @@ -16,7 +15,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do
@impl true
def init(_opts) do
topics = [
{"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1},
{"beacon_aggregate_and_proof", Types.SignedAggregateAndProof,
&Handler.handle_beacon_aggregate_and_proof/1},
{"voluntary_exit", Types.SignedVoluntaryExit, &Handler.handle_voluntary_exit/1},
Expand Down
3 changes: 2 additions & 1 deletion lib/libp2p_port.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
end

@doc """
Joins the given topic. This also starts receiving messages for the topic.
Joins the given topic.
This does not subscribe to the topic, use `subscribe_to_topic/2` for that.
"""
@spec join_topic(GenServer.server(), String.t()) :: :ok | {:error, String.t()}
def join_topic(pid \\ __MODULE__, topic_name) do
Expand Down

0 comments on commit 44f0fbc

Please sign in to comment.