Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect attestations during slot #890

Merged
merged 7 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/lambda_ethereum_consensus/beacon/beacon_node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ defmodule LambdaEthereumConsensus.Beacon.BeaconNode do
LambdaEthereumConsensus.Beacon.PendingBlocks,
LambdaEthereumConsensus.Beacon.SyncBlocks,
LambdaEthereumConsensus.P2P.GossipSub,
LambdaEthereumConsensus.P2P.Gossip.Attestation,
# TODO: move checkpoint sync outside and move this to application.ex
{LambdaEthereumConsensus.Validator, {head_slot, head_root}}
]
Expand Down
69 changes: 69 additions & 0 deletions lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
@moduledoc """
This module handles attestation gossipsub topics.
"""
use GenServer

alias LambdaEthereumConsensus.Beacon.BeaconChain
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.P2P
alias LambdaEthereumConsensus.SszEx
alias LambdaEthereumConsensus.StateTransition.Misc

@subnet_id_start byte_size("/eth2/00000000/beacon_attestation_")
@subnet_id_end byte_size("/ssz_snappy")

def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def join(subnet_id) do
topic = get_topic_name(subnet_id)
Libp2pPort.join_topic(topic)
Expand Down Expand Up @@ -55,4 +64,64 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
next_fork_epoch: Constants.far_future_epoch()
}
end

def collect(subnet_id, attestation_data) do
GenServer.call(__MODULE__, {:collect, subnet_id, attestation_data})
join(subnet_id)
end

def stop_collecting(subnet_id) do
leave(subnet_id)
GenServer.call(__MODULE__, {:stop_collecting, subnet_id})
end

@impl true
def init(_init_arg) do
{:ok, %{attnets: %{}, attestations: %{}}}
end

@impl true
def handle_call({:collect, subnet_id, attestation_data}, _from, state) do
new_state = %{state | attnets: Map.put(state.attnets, subnet_id, attestation_data)}
{:reply, :ok, new_state}
end

def handle_call({:stop_collecting, subnet_id}, _from, state) do
if Map.has_key?(state.attnets, subnet_id) do
{collected, atts} = Map.pop(state.attestations, subnet_id, [])
new_state = %{state | attnets: Map.delete(state.attnets, subnet_id), attestations: atts}
{:reply, {:ok, collected}, new_state}
else
{:reply, {:error, "subnet not joined"}, state}
end
end

@impl true
def handle_info({:gossipsub, {topic, msg_id, message}}, state) do
subnet_id = extract_subnet_id(topic)

with {:ok, uncompressed} <- :snappyer.decompress(message),
{:ok, attestation} <- Ssz.from_ssz(uncompressed, Types.Attestation) do
# TODO: validate before accepting
Libp2pPort.validate_message(msg_id, :accept)
new_state = store_attestation(subnet_id, state, attestation)
{:noreply, new_state}
else
{:error, _} -> Libp2pPort.validate_message(msg_id, :reject)
end
end

defp extract_subnet_id(topic) do
String.slice(topic, @subnet_id_start..-(@subnet_id_end + 1)) |> String.to_integer()
end

defp store_attestation(subnet_id, %{attestations: attestations} = state, attestation) do
# TODO: compare attestation with attestation_data
if Map.has_key?(attestation, subnet_id) do
attestations = Map.update(attestations, subnet_id, [], &[attestation | &1])
%{state | attestations: attestations}
else
state
end
end
end
76 changes: 43 additions & 33 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule LambdaEthereumConsensus.Validator do
epoch = Misc.compute_epoch_at_slot(slot)
beacon = fetch_target_state(epoch, root)
duties = maybe_update_duties(state.duties, beacon, epoch, state.validator)
join_subnets_for_duties(duties, beacon, epoch)
join_subnets_for_duties(duties)
log_duties(duties, state.validator.index)
{:noreply, %{state | duties: duties}}
end
Expand All @@ -51,6 +51,7 @@ defmodule LambdaEthereumConsensus.Validator do
new_state = update_state(state, slot, head_root)

if should_attest?(state, slot), do: attest(state)
maybe_publish_aggregate(state, slot)

{:noreply, new_state}
end
Expand All @@ -71,7 +72,7 @@ defmodule LambdaEthereumConsensus.Validator do
shift_duties(state.duties, epoch, last_epoch)
|> maybe_update_duties(new_beacon, epoch, state.validator)

move_subnets(state.duties, new_duties, new_beacon, epoch)
move_subnets(state.duties, new_duties)
log_duties(new_duties, state.validator.index)

%{state | slot: slot, root: head_root, duties: new_duties}
Expand Down Expand Up @@ -109,22 +110,22 @@ defmodule LambdaEthereumConsensus.Validator do

defp maybe_update_attester_duties(duties, _, _, _), do: duties

defp compute_attester_duty(duties, index, beacon_state, epoch, validator) when index in 0..1 do
defp compute_attester_duty(duties, index, beacon_state, base_epoch, validator)
when index in 0..1 do
epoch = base_epoch + index
# Can't fail
{:ok, duty} = Utils.get_committee_assignment(beacon_state, epoch + index, validator.index)
duty = update_with_aggregation_duty(duty, beacon_state, validator.privkey)
put_elem(duties, index, duty)
end
{:ok, duty} = Utils.get_committee_assignment(beacon_state, epoch, validator.index)

defp move_subnets(%{attester: {old_ep0, old_ep1}}, %{attester: {ep0, ep1}}, beacon_state, epoch) do
[old_subnet0, new_subnet0] =
compute_subnet_ids_for_duties([old_ep0, ep0], beacon_state, epoch)
duty =
update_with_aggregation_duty(duty, beacon_state, validator.privkey)
|> update_with_subnet_id(beacon_state, epoch)

[old_subnet1, new_subnet1] =
compute_subnet_ids_for_duties([old_ep1, ep1], beacon_state, epoch + 1)
put_elem(duties, index, duty)
end

old_subnets = MapSet.new([old_subnet0, old_subnet1])
new_subnets = MapSet.new([new_subnet0, new_subnet1])
defp move_subnets(%{attester: {old_ep0, old_ep1}}, %{attester: {ep0, ep1}}) do
old_subnets = MapSet.new([old_ep0.subnet_id, old_ep1.subnet_id])
new_subnets = MapSet.new([ep0.subnet_id, ep1.subnet_id])

# leave old subnets (except for recurring ones)
MapSet.difference(old_subnets, new_subnets) |> leave()
Expand All @@ -133,22 +134,8 @@ defmodule LambdaEthereumConsensus.Validator do
MapSet.difference(new_subnets, old_subnets) |> join()
end

defp join_subnets_for_duties(%{attester: {ep0, ep1}}, beacon_state, epoch) do
[subnet0] = compute_subnet_ids_for_duties([ep0], beacon_state, epoch)
[subnet1] = compute_subnet_ids_for_duties([ep1], beacon_state, epoch + 1)
join([subnet0, subnet1])
end

defp compute_subnet_ids_for_duties(duties, beacon_state, epoch) do
committees_per_slot = Accessors.get_committee_count_per_slot(beacon_state, epoch)
Enum.map(duties, &compute_subnet_id_for_duty(&1, committees_per_slot))
end

defp compute_subnet_id_for_duty(
%{committee_index: committee_index, slot: slot},
committees_per_slot
) do
Utils.compute_subnet_for_attestation(committees_per_slot, slot, committee_index)
defp join_subnets_for_duties(%{attester: {ep0, ep1}}) do
join([ep0.subnet_id, ep1.subnet_id])
end

defp join(subnets) do
Expand Down Expand Up @@ -186,9 +173,24 @@ defmodule LambdaEthereumConsensus.Validator do

Logger.info("[Validator] Attesting in slot #{attestation.data.slot} on subnet #{subnet_id}")
Gossip.Attestation.publish(subnet_id, attestation)
:ok

if current_duty.is_aggregator do
Logger.info("[Validator] Collecting messages for future aggregation...")
Gossip.Attestation.collect(subnet_id, attestation)
end
end

def maybe_publish_aggregate(
%{duties: %{attester: {%{slot: duty_slot, is_aggregator: true} = duty, _}}},
slot
)
when duty_slot == slot + 1 do
# TODO: generate aggregate and publish
_ = Gossip.Attestation.stop_collecting(duty.subnet_id)
end

def maybe_publish_aggregate(_, _), do: :ok

defp produce_attestation(duty, head_root, privkey) do
%{
index_in_committee: index_in_committee,
Expand Down Expand Up @@ -230,8 +232,7 @@ defmodule LambdaEthereumConsensus.Validator do
signature: signature
}

[subnet_id] = compute_subnet_ids_for_duties([duty], head_state, head_epoch)
{subnet_id, attestation}
{duty.subnet_id, attestation}
end

defp process_slots(%{slot: old_slot} = state, slot) when old_slot == slot, do: state
Expand All @@ -246,4 +247,13 @@ defmodule LambdaEthereumConsensus.Validator do
|> Utils.aggregator?(duty.committee_length)
|> then(&Map.put(duty, :is_aggregator, &1))
end

defp update_with_subnet_id(duty, beacon_state, epoch) do
committees_per_slot = Accessors.get_committee_count_per_slot(beacon_state, epoch)

subnet_id =
Utils.compute_subnet_for_attestation(committees_per_slot, duty.slot, duty.committee_index)

Map.put(duty, :subnet_id, subnet_id)
end
end
Loading