Skip to content

Commit

Permalink
feat: collect attestations during slot
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaRedHand committed Mar 18, 2024
1 parent 1f1f9eb commit 3bf22e5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 41 deletions.
15 changes: 8 additions & 7 deletions lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
}
end

def collect(subnet_id) do
GenServer.call(__MODULE__, {:collect, subnet_id})
def collect(subnet_id, attestation_data) do
GenServer.call(__MODULE__, {:collect, subnet_id, attestation_data})
join(subnet_id)
end

Expand All @@ -77,20 +77,20 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do

@impl true
def init(_init_arg) do
{:ok, %{attnets: MapSet.new(), attestations: %{}}}
{:ok, %{attnets: %{}, attestations: %{}}}
end

@impl true
def handle_call({:collect, subnet_id}, _from, state) do
new_state = %{state | attnets: MapSet.put(state.attnets, subnet_id)}
def handle_call({:collect, subnet_id, attestation_data}, _from, state) do
new_state = %{state | attnets: Map.put(state.attnets, subnet_id, attestation_data)}
{:reply, :ok, new_state}
end

def handle_call({:stop_collecting, subnet_id}, _from, state) do
if MapSet.member?(state.attnets, subnet_id) do
if Map.has_key?(state.attnets, subnet_id) do
{collected, atts} = Map.pop(state.attestations, subnet_id, [])
flush_remaining(subnet_id)
new_state = %{state | attnets: MapSet.delete(state.attnets, subnet_id), attestations: atts}
new_state = %{state | attnets: Map.delete(state.attnets, subnet_id), attestations: atts}
{:reply, {:ok, collected}, new_state}
else
{:reply, {:error, "subnet not joined"}, state}
Expand All @@ -117,6 +117,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
end

defp store_attestation(subnet_id, %{attestations: attestations} = state, attestation) do
# TODO: compare attestation with attestation_data
if Map.has_key?(attestation, subnet_id) do
attestations = Map.update(attestations, subnet_id, [], &[attestation | &1])
%{state | attestations: attestations}
Expand Down
80 changes: 46 additions & 34 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule LambdaEthereumConsensus.Validator do
epoch = Misc.compute_epoch_at_slot(slot)
beacon = fetch_target_state(epoch, root)
duties = maybe_update_duties(state.duties, beacon, epoch, state.validator)
join_subnets_for_duties(duties, beacon, epoch)
join_subnets_for_duties(duties)
log_duties(duties, state.validator.index)
{:noreply, %{state | duties: duties}}
end
Expand All @@ -51,6 +51,7 @@ defmodule LambdaEthereumConsensus.Validator do
new_state = update_state(state, slot, head_root)

if should_attest?(state, slot), do: attest(state)
maybe_publish_aggregate(state, slot)

{:noreply, new_state}
end
Expand All @@ -71,7 +72,7 @@ defmodule LambdaEthereumConsensus.Validator do
shift_duties(state.duties, epoch, last_epoch)
|> maybe_update_duties(new_beacon, epoch, state.validator)

move_subnets(state.duties, new_duties, new_beacon, epoch)
move_subnets(state.duties, new_duties)
log_duties(new_duties, state.validator.index)

%{state | slot: slot, root: head_root, duties: new_duties}
Expand Down Expand Up @@ -109,22 +110,22 @@ defmodule LambdaEthereumConsensus.Validator do

defp maybe_update_attester_duties(duties, _, _, _), do: duties

defp compute_attester_duty(duties, index, beacon_state, epoch, validator) when index in 0..1 do
defp compute_attester_duty(duties, index, beacon_state, base_epoch, validator)
when index in 0..1 do
epoch = base_epoch + index
# Can't fail
{:ok, duty} = Utils.get_committee_assignment(beacon_state, epoch + index, validator.index)
duty = update_with_aggregation_duty(duty, beacon_state, validator.privkey)
put_elem(duties, index, duty)
end
{:ok, duty} = Utils.get_committee_assignment(beacon_state, epoch, validator.index)

defp move_subnets(%{attester: {old_ep0, old_ep1}}, %{attester: {ep0, ep1}}, beacon_state, epoch) do
[old_subnet0, new_subnet0] =
compute_subnet_ids_for_duties([old_ep0, ep0], beacon_state, epoch)
duty =
update_with_aggregation_duty(duty, beacon_state, validator.privkey)
|> update_with_subnet_id(beacon_state, epoch)

[old_subnet1, new_subnet1] =
compute_subnet_ids_for_duties([old_ep1, ep1], beacon_state, epoch + 1)
put_elem(duties, index, duty)
end

old_subnets = MapSet.new([old_subnet0, old_subnet1])
new_subnets = MapSet.new([new_subnet0, new_subnet1])
defp move_subnets(%{attester: {old_ep0, old_ep1}}, %{attester: {ep0, ep1}}) do

Check warning on line 126 in lib/lambda_ethereum_consensus/validator/validator.ex

View workflow job for this annotation

GitHub Actions / Build project

variable "ep0" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 126 in lib/lambda_ethereum_consensus/validator/validator.ex

View workflow job for this annotation

GitHub Actions / Build project

variable "ep1" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 126 in lib/lambda_ethereum_consensus/validator/validator.ex

View workflow job for this annotation

GitHub Actions / Test

variable "ep0" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 126 in lib/lambda_ethereum_consensus/validator/validator.ex

View workflow job for this annotation

GitHub Actions / Test

variable "ep1" is unused (if the variable is not meant to be used, prefix it with an underscore)
old_subnets = MapSet.new([old_ep0.subnet_id, old_ep1.subnet_id])
new_subnets = MapSet.new([new_ep0.subnet_id, new_ep1.subnet_id])

# leave old subnets (except for recurring ones)
MapSet.difference(old_subnets, new_subnets) |> leave()
Expand All @@ -133,22 +134,8 @@ defmodule LambdaEthereumConsensus.Validator do
MapSet.difference(new_subnets, old_subnets) |> join()
end

defp join_subnets_for_duties(%{attester: {ep0, ep1}}, beacon_state, epoch) do
[subnet0] = compute_subnet_ids_for_duties([ep0], beacon_state, epoch)
[subnet1] = compute_subnet_ids_for_duties([ep1], beacon_state, epoch + 1)
join([subnet0, subnet1])
end

defp compute_subnet_ids_for_duties(duties, beacon_state, epoch) do
committees_per_slot = Accessors.get_committee_count_per_slot(beacon_state, epoch)
Enum.map(duties, &compute_subnet_id_for_duty(&1, committees_per_slot))
end

defp compute_subnet_id_for_duty(
%{committee_index: committee_index, slot: slot},
committees_per_slot
) do
Utils.compute_subnet_for_attestation(committees_per_slot, slot, committee_index)
defp join_subnets_for_duties(%{attester: {ep0, ep1}}) do
join([ep0.subnet_id, ep1.subnet_id])
end

defp join(subnets) do
Expand Down Expand Up @@ -184,11 +171,28 @@ defmodule LambdaEthereumConsensus.Validator do
{subnet_id, attestation} =
produce_attestation(current_duty, state.root, state.validator.privkey)

Logger.info("[Validator] Attesting in slot #{attestation.data.slot} on subnet #{subnet_id}")
Gossip.Attestation.publish(subnet_id, attestation)
if current_duty.is_aggregator do
Logger.info("[Validator] Aggregating on subnet #{subnet_id}")
Gossip.Attestation.collect(subnet_id, attestation)
else
Logger.info("[Validator] Attesting in slot #{attestation.data.slot} on subnet #{subnet_id}")
Gossip.Attestation.publish(subnet_id, attestation)
end

:ok
end

def maybe_publish_aggregate(
%{duties: %{attester: {%{slot: duty_slot, is_aggregator: true} = duty, _}}},
slot
)
when duty_slot == slot + 1 do
# TODO: generate aggregate and publish
_ = Gossip.Attestation.stop_collecting(duty.subnet_id)
end

def maybe_publish_aggregate(_, _), do: :ok

defp produce_attestation(duty, head_root, privkey) do
%{
index_in_committee: index_in_committee,
Expand Down Expand Up @@ -230,8 +234,7 @@ defmodule LambdaEthereumConsensus.Validator do
signature: signature
}

[subnet_id] = compute_subnet_ids_for_duties([duty], head_state, head_epoch)
{subnet_id, attestation}
{duty.subnet_id, attestation}
end

defp process_slots(%{slot: old_slot} = state, slot) when old_slot == slot, do: state
Expand All @@ -246,4 +249,13 @@ defmodule LambdaEthereumConsensus.Validator do
|> Utils.aggregator?(duty.committee_length)
|> then(&Map.put(duty, :is_aggregator, &1))
end

defp update_with_subnet_id(duty, beacon_state, epoch) do
committees_per_slot = Accessors.get_committee_count_per_slot(beacon_state, epoch)

subnet_id =
Utils.compute_subnet_for_attestation(committees_per_slot, duty.slot, duty.committee_index)

Map.put(duty, :subnet_id, subnet_id)
end
end

0 comments on commit 3bf22e5

Please sign in to comment.