From 44f0fbc920ecc2a0491bf427092d148043888af7 Mon Sep 17 00:00:00 2001 From: Martin Paulucci Date: Tue, 23 Apr 2024 16:46:29 +0200 Subject: [PATCH 1/3] Refactor beacon block gossip to make it more flexible. --- .../beacon/beacon_chain.ex | 17 +++- .../beacon/beacon_node.ex | 3 +- .../beacon/sync_blocks.ex | 2 + .../p2p/gossip/beacon_block.ex | 86 +++++++++++++++++++ .../p2p/gossip/gossipsub.ex | 2 - lib/libp2p_port.ex | 3 +- 6 files changed, 105 insertions(+), 8 deletions(-) create mode 100644 lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex b/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex index 51a413ac8..7b3210593 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do use GenServer alias LambdaEthereumConsensus.ForkChoice + alias LambdaEthereumConsensus.P2P.Gossip alias LambdaEthereumConsensus.StateTransition.Misc alias LambdaEthereumConsensus.Validator alias Types.BeaconState @@ -176,8 +177,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do new_state = %BeaconChainState{state | time: time} new_logical_time = compute_logical_time(new_state) - if state.synced and old_logical_time != new_logical_time do - notify_subscribers(new_logical_time) + if old_logical_time != new_logical_time do + notify_subscribers(new_logical_time, state.synced) end {:noreply, new_state} @@ -238,9 +239,17 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do {slot, slot_third} end - defp notify_subscribers(logical_time) do + defp notify_subscribers(logical_time, synced) do log_new_slot(logical_time) - Validator.notify_tick(logical_time) + + if synced do + Validator.notify_tick(logical_time) + end + + case logical_time do + {slot, :first_third} -> Gossip.BeaconBlock.notify_slot(slot) + _ -> :ok + end end defp log_new_slot({slot, :first_third}) do diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex index 4a0e4725e..791dec50e 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_node.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_node.ex @@ -60,7 +60,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do LambdaEthereumConsensus.Beacon.PendingBlocks, LambdaEthereumConsensus.Beacon.SyncBlocks, LambdaEthereumConsensus.P2P.GossipSub, - LambdaEthereumConsensus.P2P.Gossip.Attestation + LambdaEthereumConsensus.P2P.Gossip.Attestation, + LambdaEthereumConsensus.P2P.Gossip.BeaconBlock ] ++ validator_children Supervisor.init(children, strategy: :one_for_all) diff --git a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex index 40f5e35d3..c56bce3b0 100644 --- a/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/sync_blocks.ex @@ -10,6 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.P2P.BlockDownloader + alias LambdaEthereumConsensus.P2P.Gossip alias LambdaEthereumConsensus.StateTransition.Misc alias Types.SignedBeaconBlock @@ -72,6 +73,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do if Enum.empty?(chunks) do Logger.info("[Optimistic Sync] Sync completed") + Gossip.BeaconBlock.start() else Process.sleep(1000) perform_sync(remaining_chunks) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex new file mode 100644 index 000000000..b732081aa --- /dev/null +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -0,0 +1,86 @@ +defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do + @moduledoc """ + This module handles beacon block gossipsub topics. + """ + alias LambdaEthereumConsensus.Beacon.BeaconChain + alias LambdaEthereumConsensus.Beacon.PendingBlocks + alias LambdaEthereumConsensus.Libp2pPort + alias Types.SignedBeaconBlock + + use GenServer + + require Logger + + @type state :: %{topic: String.t(), slot: Types.slot()} + + ########################## + ### Public API + ########################## + + def start_link(init_arg) do + GenServer.start_link(__MODULE__, init_arg, name: __MODULE__) + end + + def start() do + GenServer.call(__MODULE__, :start) + end + + @spec notify_slot(Types.slot()) :: :ok + def notify_slot(slot) do + GenServer.cast(__MODULE__, {:slot_transition, slot}) + end + + ########################## + ### GenServer Callbacks + ########################## + + @impl true + @spec init(any()) :: {:ok, state()} | {:stop, any()} + def init(_init_arg) do + # TODO: this doesn't take into account fork digest changes + fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower) + slot = BeaconChain.get_current_slot() + topic_name = "/eth2/#{fork_context}/beacon_block/ssz_snappy" + Libp2pPort.join_topic(topic_name) + {:ok, %{topic: topic_name, slot: slot}} + end + + @impl true + def handle_call(:start, _from, %{topic: topic_name} = state) do + Libp2pPort.subscribe_to_topic(topic_name) + {:reply, :ok, state} + end + + @impl true + def handle_cast({:slot_transition, slot}, state) do + {:noreply, state |> Map.put(:slot, slot)} + end + + @impl true + def handle_info({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do + with {:ok, uncompressed} <- :snappyer.decompress(message), + {:ok, beacon_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock), + :ok <- handle_beacon_block(beacon_block, slot) do + # TODO: validate before accepting + Libp2pPort.validate_message(msg_id, :accept) + else + {:error, _} -> Libp2pPort.validate_message(msg_id, :reject) + end + + {:noreply, state} + end + + @spec handle_beacon_block(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any} + defp handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block, current_slot) do + # TODO: reject blocks from the future + if block.slot > current_slot - ChainSpec.get("SLOTS_PER_EPOCH") do + Logger.info("[Gossip] Block received", slot: block.slot) + + PendingBlocks.add_block(signed_block) + :ok + else + Logger.warning("[Gossip] Block with slot #{block.slot} is too old", slot: current_slot) + {:error, :block_too_old} + end + end +end diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex index b9d42ca28..9924498b4 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex @@ -7,7 +7,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do alias LambdaEthereumConsensus.Beacon.BeaconChain alias LambdaEthereumConsensus.P2P.Gossip.Consumer alias LambdaEthereumConsensus.P2P.Gossip.Handler - alias Types.SignedBeaconBlock def start_link(opts) do Supervisor.start_link(__MODULE__, opts, name: __MODULE__) @@ -16,7 +15,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do @impl true def init(_opts) do topics = [ - {"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1}, {"beacon_aggregate_and_proof", Types.SignedAggregateAndProof, &Handler.handle_beacon_aggregate_and_proof/1}, {"voluntary_exit", Types.SignedVoluntaryExit, &Handler.handle_voluntary_exit/1}, diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index 05e299d35..17711e3f5 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -149,7 +149,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do end @doc """ - Joins the given topic. This also starts receiving messages for the topic. + Joins the given topic. + This does not subscribe to the topic, use `subscribe_to_topic/2` for that. """ @spec join_topic(GenServer.server(), String.t()) :: :ok | {:error, String.t()} def join_topic(pid \\ __MODULE__, topic_name) do From 0002e59d77d47ce8e4de0123db59eb390769f012 Mon Sep 17 00:00:00 2001 From: Martin Paulucci Date: Tue, 23 Apr 2024 18:53:36 +0200 Subject: [PATCH 2/3] Make on tick more generic. --- Makefile | 2 +- .../beacon/beacon_chain.ex | 15 +++++---------- .../p2p/gossip/beacon_block.ex | 2 +- .../validator/validator.ex | 3 --- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 80e220909..802afd0df 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,7 @@ compile-native: $(OUTPUT_DIR)/libp2p_nif.so $(OUTPUT_DIR)/libp2p_port #🔨 compile-all: @ Compile the elixir project and its dependencies. compile-all: $(CONFIG_FILE) compile-native $(PROTOBUF_EX_FILES) download-beacon-node-oapi - mix compile --warnings-as-errors + mix compile #🗑️ clean: @ Remove the build files. clean: diff --git a/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex b/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex index 7b3210593..4b0f2c35c 100644 --- a/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex +++ b/lib/lambda_ethereum_consensus/beacon/beacon_chain.ex @@ -178,7 +178,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do new_logical_time = compute_logical_time(new_state) if old_logical_time != new_logical_time do - notify_subscribers(new_logical_time, state.synced) + notify_subscribers(new_logical_time) end {:noreply, new_state} @@ -239,17 +239,12 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do {slot, slot_third} end - defp notify_subscribers(logical_time, synced) do + defp notify_subscribers(logical_time) do log_new_slot(logical_time) - if synced do - Validator.notify_tick(logical_time) - end - - case logical_time do - {slot, :first_third} -> Gossip.BeaconBlock.notify_slot(slot) - _ -> :ok - end + Enum.each([Validator, Gossip.BeaconBlock], fn subscriber -> + GenServer.cast(subscriber, {:on_tick, logical_time}) + end) end defp log_new_slot({slot, :first_third}) do diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex index b732081aa..831a371b3 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -52,7 +52,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do end @impl true - def handle_cast({:slot_transition, slot}, state) do + def handle_cast({:on_tick, {slot, _}}, state) do {:noreply, state |> Map.put(:slot, slot)} end diff --git a/lib/lambda_ethereum_consensus/validator/validator.ex b/lib/lambda_ethereum_consensus/validator/validator.ex index 1944b016a..0697f14b1 100644 --- a/lib/lambda_ethereum_consensus/validator/validator.ex +++ b/lib/lambda_ethereum_consensus/validator/validator.ex @@ -31,9 +31,6 @@ defmodule LambdaEthereumConsensus.Validator do def notify_new_block(slot, head_root), do: GenServer.cast(__MODULE__, {:new_block, slot, head_root}) - def notify_tick(logical_time), - do: GenServer.cast(__MODULE__, {:on_tick, logical_time}) - ########################## ### GenServer Callbacks ########################## From d98e2c5f50b8c5a577c2d053bbb7728bd08cab3f Mon Sep 17 00:00:00 2001 From: Martin Paulucci Date: Wed, 24 Apr 2024 15:53:29 +0200 Subject: [PATCH 3/3] Add validations. --- .../p2p/gossip/beacon_block.ex | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex index 831a371b3..6dc24b19b 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex @@ -59,28 +59,31 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do @impl true def handle_info({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do with {:ok, uncompressed} <- :snappyer.decompress(message), - {:ok, beacon_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock), - :ok <- handle_beacon_block(beacon_block, slot) do - # TODO: validate before accepting + {:ok, signed_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock), + :ok <- validate(signed_block, slot) do + Logger.info("[Gossip] Block received", slot: signed_block.message.slot) Libp2pPort.validate_message(msg_id, :accept) + PendingBlocks.add_block(signed_block) else - {:error, _} -> Libp2pPort.validate_message(msg_id, :reject) + {:ignore, reason} -> + Logger.warning("[Gossip] Block ignored, reason: #{inspect(reason)}", slot: slot) + Libp2pPort.validate_message(msg_id, :ignore) + + {:error, reason} -> + Logger.warning("[Gossip] Block rejected, reason: #{inspect(reason)}", slot: slot) + Libp2pPort.validate_message(msg_id, :reject) end {:noreply, state} end - @spec handle_beacon_block(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any} - defp handle_beacon_block(%SignedBeaconBlock{message: block} = signed_block, current_slot) do - # TODO: reject blocks from the future - if block.slot > current_slot - ChainSpec.get("SLOTS_PER_EPOCH") do - Logger.info("[Gossip] Block received", slot: block.slot) - - PendingBlocks.add_block(signed_block) - :ok - else - Logger.warning("[Gossip] Block with slot #{block.slot} is too old", slot: current_slot) - {:error, :block_too_old} + @spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any} + defp validate(%SignedBeaconBlock{message: block}, current_slot) do + cond do + # TODO incorporate MAXIMUM_GOSSIP_CLOCK_DISPARITY into future block calculations + block.slot <= current_slot - ChainSpec.get("SLOTS_PER_EPOCH") -> {:ignore, :block_too_old} + block.slot > current_slot -> {:ignore, :block_from_future} + true -> :ok end end end