diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex b/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex index 51a413ac8..7b3210593 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do use GenServer alias LambdaEthereumConsensus.ForkChoice + alias LambdaEthereumConsensus.P2P.Gossip alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Validator alias Types.BeaconState @@ -176,8 +177,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do new_state = %BeaconChainState{state | time: time} new_logical_time = compute_logical_time(new_state) - if state.synced and old_logical_time != new_logical_time do - notify_subscribers(new_logical_time) + if old_logical_time != new_logical_time do + notify_subscribers(new_logical_time, state.synced) end {:noreply, new_state} @@ -238,9 +239,17 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do {slot, slot_third} end - defp notify_subscribers(logical_time) do + defp notify_subscribers(logical_time, synced) do log_new_slot(logical_time) - Validator.notify_tick(logical_time) + + if synced do + Validator.notify_tick(logical_time) + end + + case logical_time do + {slot, :first_third} -> Gossip.BeaconBlock.notify_slot(slot) + _ -> :ok + end end defp log_new_slot({slot, :first_third}) do diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index 4a0e4725e..791dec50e 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -60,7 +60,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do LambdaEthereumConsensus.Beacon.PendingBlocks, LambdaEthereumConsensus.Beacon.SyncBlocks, LambdaEthereumConsensus.P2P.GossipSub, - LambdaEthereumConsensus.P2P.Gossip.Attestation + LambdaEthereumConsensus.P2P.Gossip.Attestation, + LambdaEthereumConsensus.P2P.Gossip.BeaconBlock ] ++ validator_children Supervisor.init(children, strategy: :one_for_all) diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index 40f5e35d3..c56bce3b0 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -10,6 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.P2P.BlockDownloader + alias LambdaEthereumConsensus.P2P.Gossip alias LambdaEthereumConsensus.StateTransition.Misc alias Types.SignedBeaconBlock @@ -72,6 +73,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do if Enum.empty?(chunks) do Logger.info("[Optimistic Sync] Sync completed") + Gossip.BeaconBlock.start() else Process.sleep(1000) perform_sync(remaining_chunks) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex new file mode 100644 index 000000000..b732081aa --- /dev/null +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -0,0 +1,86 @@ +defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do + @moduledoc """ + This module handles beacon block gossipsub topics. + """ + alias LambdaEthereumConsensus.Beacon.BeaconChain + alias LambdaEthereumConsensus.Beacon.PendingBlocks + alias LambdaEthereumConsensus.Libp2pPort + alias Types.SignedBeaconBlock + + use GenServer + + require Logger + + @type state :: %{topic: String.t(), slot: Types.slot()} + + ########################## + ### Public API + ########################## + + def start_link(init_arg) do + GenServer.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + def start() do + GenServer.call(__MODULE__, :start) + end + + @spec notify_slot(Types.slot()) :: :ok + def notify_slot(slot) do + GenServer.cast(__MODULE__, {:slot_transition, slot}) + end + + ########################## + ### GenServer Callbacks + ########################## + + @impl true + @spec init(any()) :: {:ok, state()} | {:stop, any()} + def init(_init_arg) do + # TODO: this doesn't take into account fork digest changes + fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower) + slot = BeaconChain.get_current_slot() + topic_name = "/eth2/#{fork_context}/beacon_block/ssz_snappy" + Libp2pPort.join_topic(topic_name) + {:ok, %{topic: topic_name, slot: slot}} + end + + @impl true + def handle_call(:start, _from, %{topic: topic_name} = state) do + Libp2pPort.subscribe_to_topic(topic_name) + {:reply, :ok, state} + end + + @impl true + def handle_cast({:slot_transition, slot}, state) do + {:noreply, state |> Map.put(:slot, slot)} + end + + @impl true + def handle_info({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, beacon_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock), + :ok <- handle_beacon_block(beacon_block, slot) do + # TODO: validate before accepting + Libp2pPort.validate_message(msg_id, :accept) + else + {:error, _} -> Libp2pPort.validate_message(msg_id, :reject) + end + + {:noreply, state} + end + + @spec handle_beacon_block(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any} + defp handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block, current_slot) do + # TODO: reject blocks from the future + if block.slot > current_slot - ChainSpec.get("SLOTS_PER_EPOCH") do + Logger.info("[Gossip] Block received", slot: block.slot) + + PendingBlocks.add_block(signed_block) + :ok + else + Logger.warning("[Gossip] Block with slot #{block.slot} is too old", slot: current_slot) + {:error, :block_too_old} + end + end +end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex index b9d42ca28..9924498b4 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex @@ -7,7 +7,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.P2P.Gossip.Consumer alias LambdaEthereumConsensus.P2P.Gossip.Handler - alias Types.SignedBeaconBlock def start_link(opts) do Supervisor.start_link(__MODULE__, opts, name: __MODULE__) @@ -16,7 +15,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do @impl true def init(_opts) do topics = [ - {"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1}, {"beacon_aggregate_and_proof", Types.SignedAggregateAndProof, &Handler.handle_beacon_aggregate_and_proof/1}, {"voluntary_exit", Types.SignedVoluntaryExit, &Handler.handle_voluntary_exit/1}, diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 05e299d35..17711e3f5 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -149,7 +149,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do end @doc """ - Joins the given topic. This also starts receiving messages for the topic. + Joins the given topic. + This does not subscribe to the topic, use `subscribe_to_topic/2` for that. """ @spec join_topic(GenServer.server(), String.t()) :: :ok | {:error, String.t()} def join_topic(pid \\ __MODULE__, topic_name) do