Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: beacon block gossip to make it more flexible. #1022

Merged
merged 4 commits into from
Apr 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -91,7 +91,7 @@ compile-native: $(OUTPUT_DIR)/libp2p_nif.so $(OUTPUT_DIR)/libp2p_port

#🔨 compile-all: @ Compile the elixir project and its dependencies.
compile-all: $(CONFIG_FILE) compile-native $(PROTOBUF_EX_FILES) download-beacon-node-oapi
mix compile --warnings-as-errors
mix compile

#🗑️ clean: @ Remove the build files.
clean:
8 changes: 6 additions & 2 deletions lib/lambda_ethereum_consensus/beacon/beacon_chain.ex
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
use GenServer

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.P2P.Gossip
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Validator
alias Types.BeaconState
@@ -176,7 +177,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do
new_state = %BeaconChainState{state | time: time}
new_logical_time = compute_logical_time(new_state)

if state.synced and old_logical_time != new_logical_time do
if old_logical_time != new_logical_time do
notify_subscribers(new_logical_time)
end

@@ -240,7 +241,10 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconChain do

defp notify_subscribers(logical_time) do
log_new_slot(logical_time)
Validator.notify_tick(logical_time)

Enum.each([Validator, Gossip.BeaconBlock], fn subscriber ->
GenServer.cast(subscriber, {:on_tick, logical_time})
end)
end

defp log_new_slot({slot, :first_third}) do
3 changes: 2 additions & 1 deletion lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
@@ -60,7 +60,8 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
LambdaEthereumConsensus.P2P.GossipSub,
LambdaEthereumConsensus.P2P.Gossip.Attestation
LambdaEthereumConsensus.P2P.Gossip.Attestation,
LambdaEthereumConsensus.P2P.Gossip.BeaconBlock
] ++ validator_children

Supervisor.init(children, strategy: :one_for_all)
2 changes: 2 additions & 0 deletions lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.P2P.BlockDownloader
alias LambdaEthereumConsensus.P2P.Gossip
alias LambdaEthereumConsensus.StateTransition.Misc
alias Types.SignedBeaconBlock

@@ -72,6 +73,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do

if Enum.empty?(chunks) do
Logger.info("[Optimistic Sync] Sync completed")
Gossip.BeaconBlock.start()
else
Process.sleep(1000)
perform_sync(remaining_chunks)
89 changes: 89 additions & 0 deletions lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
@moduledoc """
This module handles beacon block gossipsub topics.
"""
alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.Libp2pPort
alias Types.SignedBeaconBlock

use GenServer

require Logger

@type state :: %{topic: String.t(), slot: Types.slot()}

##########################
### Public API
##########################

def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start() do
GenServer.call(__MODULE__, :start)
end

@spec notify_slot(Types.slot()) :: :ok
def notify_slot(slot) do
GenServer.cast(__MODULE__, {:slot_transition, slot})
end

##########################
### GenServer Callbacks
##########################

@impl true
@spec init(any()) :: {:ok, state()} | {:stop, any()}
def init(_init_arg) do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
slot = BeaconChain.get_current_slot()
topic_name = "/eth2/#{fork_context}/beacon_block/ssz_snappy"
Libp2pPort.join_topic(topic_name)
{:ok, %{topic: topic_name, slot: slot}}
end

@impl true
def handle_call(:start, _from, %{topic: topic_name} = state) do
Libp2pPort.subscribe_to_topic(topic_name)
{:reply, :ok, state}
end

@impl true
def handle_cast({:on_tick, {slot, _}}, state) do
{:noreply, state |> Map.put(:slot, slot)}
end

@impl true
def handle_info({:gossipsub, {_topic, msg_id, message}}, %{slot: slot} = state) do
with {:ok, uncompressed} <- :snappyer.decompress(message),
{:ok, signed_block} <- Ssz.from_ssz(uncompressed, SignedBeaconBlock),
:ok <- validate(signed_block, slot) do
Logger.info("[Gossip] Block received", slot: signed_block.message.slot)
Libp2pPort.validate_message(msg_id, :accept)
PendingBlocks.add_block(signed_block)
else
{:ignore, reason} ->
Logger.warning("[Gossip] Block ignored, reason: #{inspect(reason)}", slot: slot)
Libp2pPort.validate_message(msg_id, :ignore)

{:error, reason} ->
Logger.warning("[Gossip] Block rejected, reason: #{inspect(reason)}", slot: slot)
Libp2pPort.validate_message(msg_id, :reject)
end

{:noreply, state}
end

@spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:error, any}
defp validate(%SignedBeaconBlock{message: block}, current_slot) do
cond do
# TODO incorporate MAXIMUM_GOSSIP_CLOCK_DISPARITY into future block calculations
block.slot <= current_slot - ChainSpec.get("SLOTS_PER_EPOCH") -> {:ignore, :block_too_old}
block.slot > current_slot -> {:ignore, :block_from_future}
true -> :ok
end
end
end
2 changes: 0 additions & 2 deletions lib/lambda_ethereum_consensus/p2p/gossip/gossipsub.ex
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do
alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.P2P.Gossip.Consumer
alias LambdaEthereumConsensus.P2P.Gossip.Handler
alias Types.SignedBeaconBlock

def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
@@ -16,7 +15,6 @@ defmodule LambdaEthereumConsensus.P2P.GossipSub do
@impl true
def init(_opts) do
topics = [
{"beacon_block", SignedBeaconBlock, &Handler.handle_beacon_block/1},
{"beacon_aggregate_and_proof", Types.SignedAggregateAndProof,
&Handler.handle_beacon_aggregate_and_proof/1},
{"voluntary_exit", Types.SignedVoluntaryExit, &Handler.handle_voluntary_exit/1},
3 changes: 0 additions & 3 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
@@ -31,9 +31,6 @@ defmodule LambdaEthereumConsensus.Validator do
def notify_new_block(slot, head_root),
do: GenServer.cast(__MODULE__, {:new_block, slot, head_root})

def notify_tick(logical_time),
do: GenServer.cast(__MODULE__, {:on_tick, logical_time})

##########################
### GenServer Callbacks
##########################
3 changes: 2 additions & 1 deletion lib/libp2p_port.ex
Original file line number Diff line number Diff line change
@@ -149,7 +149,8 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
end

@doc """
Joins the given topic. This also starts receiving messages for the topic.
Joins the given topic.
This does not subscribe to the topic, use `subscribe_to_topic/2` for that.
"""
@spec join_topic(GenServer.server(), String.t()) :: :ok | {:error, String.t()}
def join_topic(pid \\ __MODULE__, topic_name) do
Loading