Skip to content

Commit

Permalink
feat: construct and publish aggregated attestations (#892)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaRedHand authored Mar 20, 2024
1 parent 35151c1 commit 9ff447a
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 28 deletions.
51 changes: 34 additions & 17 deletions lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end

@spec join(non_neg_integer()) :: :ok
def join(subnet_id) do
topic = get_topic_name(subnet_id)
Libp2pPort.join_topic(topic)
Expand All @@ -25,6 +26,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
update_enr()
end

@spec leave(non_neg_integer()) :: :ok
def leave(subnet_id) do
topic = get_topic_name(subnet_id)
Libp2pPort.leave_topic(topic)
Expand All @@ -33,13 +35,35 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
update_enr()
end

@spec publish(non_neg_integer(), Types.Attestation.t()) :: :ok
def publish(subnet_id, %Types.Attestation{} = attestation) do
topic = get_topic_name(subnet_id)
{:ok, encoded} = SszEx.encode(attestation, Types.Attestation)
{:ok, message} = :snappyer.compress(encoded)
Libp2pPort.publish(topic, message)
end

def publish_aggregate(%Types.SignedAggregateAndProof{} = signed_aggregate) do
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
topic = "/eth2/#{fork_context}/beacon_aggregate_and_proof/ssz_snappy"
{:ok, encoded} = SszEx.encode(signed_aggregate, Types.SignedAggregateAndProof)
{:ok, message} = :snappyer.compress(encoded)
Libp2pPort.publish(topic, message)
end

@spec collect(non_neg_integer(), Types.AttestationData.t()) :: :ok
def collect(subnet_id, attestation_data) do
GenServer.call(__MODULE__, {:collect, subnet_id, attestation_data})
join(subnet_id)
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
leave(subnet_id)
GenServer.call(__MODULE__, {:stop_collecting, subnet_id})
end

defp get_topic_name(subnet_id) do
# TODO: this doesn't take into account fork digest changes
fork_context = BeaconChain.get_fork_digest() |> Base.encode16(case: :lower)
Expand All @@ -65,24 +89,16 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
}
end

def collect(subnet_id, attestation_data) do
GenServer.call(__MODULE__, {:collect, subnet_id, attestation_data})
join(subnet_id)
end

def stop_collecting(subnet_id) do
leave(subnet_id)
GenServer.call(__MODULE__, {:stop_collecting, subnet_id})
end

@impl true
def init(_init_arg) do
{:ok, %{attnets: %{}, attestations: %{}}}
end

@impl true
def handle_call({:collect, subnet_id, attestation_data}, _from, state) do
new_state = %{state | attnets: Map.put(state.attnets, subnet_id, attestation_data)}
attestations = Map.put(state.attestations, subnet_id, [attestation_data])
attnets = Map.put(state.attnets, subnet_id, attestation_data)
new_state = %{state | attnets: attnets, attestations: attestations}
{:reply, :ok, new_state}
end

Expand Down Expand Up @@ -116,12 +132,13 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
end

defp store_attestation(subnet_id, %{attestations: attestations} = state, attestation) do
# TODO: compare attestation with attestation_data
if Map.has_key?(attestation, subnet_id) do
attestations = Map.update(attestations, subnet_id, [], &[attestation | &1])
%{state | attestations: attestations}
else
state
case Map.get(attestation, subnet_id) do
data when data !== attestation.data ->
state

_ ->
attestations = Map.update(attestations, subnet_id, [], &[attestation | &1])
%{state | attestations: attestations}
end
end
end
66 changes: 55 additions & 11 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule LambdaEthereumConsensus.Validator do
alias LambdaEthereumConsensus.StateTransition.Accessors
alias LambdaEthereumConsensus.StateTransition.Misc
alias LambdaEthereumConsensus.Store.BlockStates
alias LambdaEthereumConsensus.Utils.BitField
alias LambdaEthereumConsensus.Utils.BitList
alias LambdaEthereumConsensus.Validator.Utils
alias Types.Attestation
Expand Down Expand Up @@ -176,21 +177,55 @@ defmodule LambdaEthereumConsensus.Validator do

if current_duty.is_aggregator do
Logger.info("[Validator] Collecting messages for future aggregation...")
Gossip.Attestation.collect(subnet_id, attestation)
Gossip.Attestation.collect(subnet_id, attestation.data)
end
end

def maybe_publish_aggregate(
%{duties: %{attester: {%{slot: duty_slot, is_aggregator: true} = duty, _}}},
slot
)
when duty_slot == slot + 1 do
# TODO: generate aggregate and publish
_ = Gossip.Attestation.stop_collecting(duty.subnet_id)
# We publish our aggregate on the next slot, and when we're an aggregator
# TODO: we should publish it two-thirds of the way through the slot
def maybe_publish_aggregate(%{duties: %{attester: {duty, _}}, validator: validator}, slot)
when duty.slot == slot + 1 and duty.is_aggregator do
case Gossip.Attestation.stop_collecting(duty.subnet_id) do
{:ok, attestations} ->
Logger.info("[Validator] Publishing aggregate of slot #{slot}")

aggregate_attestations(attestations)
|> append_proof(duty.selection_proof, validator)
|> append_signature(duty.signing_domain, validator)
|> Gossip.Attestation.publish_aggregate()

_ ->
Logger.error("[Validator] Failed to publish aggregate")
end
end

def maybe_publish_aggregate(_, _), do: :ok

defp aggregate_attestations(attestations) do
aggregation_bits =
attestations
|> Stream.map(&Map.fetch!(&1, :aggregation_bits))
|> Enum.reduce(&BitField.bitwise_or/2)

{:ok, signature} = attestations |> Enum.map(&Map.fetch!(&1, :signature)) |> Bls.aggregate()

%{List.first(attestations) | aggregation_bits: aggregation_bits, signature: signature}
end

defp append_proof(aggregate, proof, validator) do
%Types.AggregateAndProof{
aggregator_index: validator.index,
aggregate: aggregate,
selection_proof: proof
}
end

defp append_signature(aggregate_and_proof, signing_domain, %{privkey: privkey}) do
signing_root = Misc.compute_signing_root(aggregate_and_proof, signing_domain)
signature = Bls.sign(privkey, signing_root)
%Types.SignedAggregateAndProof{message: aggregate_and_proof, signature: signature}
end

defp produce_attestation(duty, head_root, privkey) do
%{
index_in_committee: index_in_committee,
Expand Down Expand Up @@ -243,9 +278,18 @@ defmodule LambdaEthereumConsensus.Validator do
end

defp update_with_aggregation_duty(duty, beacon_state, privkey) do
Utils.get_slot_signature(beacon_state, duty.slot, privkey)
|> Utils.aggregator?(duty.committee_length)
|> then(&Map.put(duty, :is_aggregator, &1))
proof = Utils.get_slot_signature(beacon_state, duty.slot, privkey)

if Utils.aggregator?(proof, duty.committee_length) do
epoch = Misc.compute_epoch_at_slot(duty.slot)
domain = Accessors.get_domain(beacon_state, Constants.domain_aggregate_and_proof(), epoch)

Map.put(duty, :is_aggregator, true)
|> Map.put(:selection_proof, proof)
|> Map.put(:signing_domain, domain)
else
Map.put(duty, :is_aggregator, false)
end
end

defp update_with_subnet_id(duty, beacon_state, epoch) do
Expand Down
11 changes: 11 additions & 0 deletions lib/utils/bit_field.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,15 @@ defmodule LambdaEthereumConsensus.Utils.BitField do
"""
@spec count(t) :: non_neg_integer()
def count(bit_field), do: for(<<bit::1 <- bit_field>>, do: bit) |> Enum.sum()

@doc """
Receives two bitfields and returns the OR of both.
"""
@spec bitwise_or(t, t) :: t
def bitwise_or(left, right) when bit_size(left) == bit_size(right) do
size = bit_size(left)
left_int = :binary.decode_unsigned(left)
right_int = :binary.decode_unsigned(right)
<<Bitwise.bor(left_int, right_int)::size(size)>>
end
end

0 comments on commit 9ff447a

Please sign in to comment.