Skip to content

Commit

Permalink
feat: sync committee contribution (#1284)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o authored Sep 10, 2024
1 parent 8af3687 commit 2cf6ef5
Show file tree
Hide file tree
Showing 15 changed files with 795 additions and 155 deletions.
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ KURTOSIS_GRAFANA_DASHBOARDS_DIR ?= $(KURTOSIS_DIR)/static_files/grafana-config/d
KURTOSIS_COOKIE ?= secret
# Name of the kurtosis service pointing to the lambdaconsesus node
KURTOSIS_SERVICE ?= cl-3-lambda-geth
# Name of the enclave to be used with kurtosis
KURTOSIS_ENCLAVE ?= lambdanet

##### TARGETS #####

Expand All @@ -75,18 +77,18 @@ kurtosis.setup.lambdaconsensus:

#💻 kurtosis.start: @ Starts the kurtosis environment
kurtosis.start:
kurtosis run --enclave lambdanet $(KURTOSIS_DIR) --args-file network_params.yaml
kurtosis run --enclave $(KURTOSIS_ENCLAVE) $(KURTOSIS_DIR) --args-file network_params.yaml

#💻 kurtosis.build-and-start: @ Builds the lambdaconsensus Docker image and starts the kurtosis environment.
kurtosis.clean-start: kurtosis.clean kurtosis.setup.lambdaconsensus kurtosis.start

#💻 kurtosis.stop: @ Stops the kurtosis environment
kurtosis.stop:
kurtosis enclave stop lambdanet
kurtosis enclave stop $(KURTOSIS_ENCLAVE)

#💻 kurtosis.remove: @ Removes the kurtosis environment
kurtosis.remove:
kurtosis enclave rm lambdanet
kurtosis enclave rm $(KURTOSIS_ENCLAVE)

#💻 kurtosis.clean: @ Clean the kurtosis environment
kurtosis.clean:
Expand All @@ -97,7 +99,7 @@ kurtosis.purge: kurtosis.stop kurtosis.remove kurtosis.clean

#💻 kurtosis.connect: @ Connects to the client running in kurtosis, KURTOSIS_SERVICE could be given
kurtosis.connect:
kurtosis service shell lambdanet $(KURTOSIS_SERVICE)
kurtosis service shell $(KURTOSIS_ENCLAVE) $(KURTOSIS_SERVICE)

#💻 kurtosis.connect.iex: @ Connects to iex ONCE INSIDE THE KURTOSIS SERVICE
kurtosis.connect.iex:
Expand Down
10 changes: 5 additions & 5 deletions lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
alias LambdaEthereumConsensus.P2P
alias LambdaEthereumConsensus.P2P.Gossip.Handler
alias LambdaEthereumConsensus.StateTransition.Misc
alias Types.SubnetInfo
alias Types.AttSubnetInfo

@behaviour Handler

Expand All @@ -36,7 +36,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
# TODO: validate before accepting
Libp2pPort.validate_message(msg_id, :accept)

SubnetInfo.add_attestation!(subnet_id, attestation)
AttSubnetInfo.add_attestation!(subnet_id, attestation)
else
{:error, _} -> Libp2pPort.validate_message(msg_id, :reject)
end
Expand Down Expand Up @@ -70,18 +70,18 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
@spec collect(non_neg_integer(), Types.Attestation.t()) :: :ok
def collect(subnet_id, attestation) do
join(subnet_id)
SubnetInfo.new_subnet_with_attestation(subnet_id, attestation)
AttSubnetInfo.new_subnet_with_attestation(subnet_id, attestation)
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
# TODO: implement some way to unsubscribe without leaving the topic
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
SubnetInfo.stop_collecting(subnet_id)
AttSubnetInfo.stop_collecting(subnet_id)
end

defp topic(subnet_id) do
Expand Down
103 changes: 101 additions & 2 deletions lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,125 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do
@moduledoc """
This module handles sync committee from specific gossip subnets.
Used by validators to fulfill aggregation duties.
TODO: This module borrows almost all of its logic from Attestation,
this could be refactored to a common module if needed in the future.
"""
alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.P2P
alias LambdaEthereumConsensus.P2P.Gossip.Handler
alias LambdaEthereumConsensus.StateTransition.Misc
alias Types.SyncSubnetInfo

@behaviour Handler

require Logger

@spec join([non_neg_integer()]) :: :ok
def join(subnet_ids) when is_list(subnet_ids) do
for subnet_id <- subnet_ids do
topic = topic(subnet_id)
Libp2pPort.join_topic(topic)

P2P.Metadata.set_syncnet(subnet_id)
end

P2P.Metadata.get_metadata()
|> update_enr()
end

@impl true
def handle_gossip_message(store, topic, msg_id, message) do
handle_gossip_message(topic, msg_id, message)
store
end

def handle_gossip_message(topic, msg_id, message) do
subnet_id = extract_subnet_id(topic)

with {:ok, uncompressed} <- :snappyer.decompress(message),
{:ok, sync_committee_msg} <- Ssz.from_ssz(uncompressed, Types.SyncCommitteeMessage) do
# TODO: validate before accepting
Libp2pPort.validate_message(msg_id, :accept)

SyncSubnetInfo.add_message!(subnet_id, sync_committee_msg)
else
{:error, _} -> Libp2pPort.validate_message(msg_id, :reject)
end
end

@spec publish(Types.SyncCommitteeMessage.t(), [non_neg_integer()]) :: :ok
def publish(%Types.SyncCommitteeMessage{} = sync_committee_msg, subnet_ids) do
Enum.each(subnet_ids, fn subnet_id ->
for subnet_id <- subnet_ids do
topic = topic(subnet_id)

{:ok, encoded} = SszEx.encode(sync_committee_msg, Types.SyncCommitteeMessage)
{:ok, message} = :snappyer.compress(encoded)
Libp2pPort.publish(topic, message)
end)
end

:ok
end

@spec publish_contribution(Types.SignedContributionAndProof.t()) :: :ok
def publish_contribution(%Types.SignedContributionAndProof{} = signed_contribution) do
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)
topic = "/eth2/#{fork_context}/sync_committee_contribution_and_proof/ssz_snappy"
{:ok, encoded} = SszEx.encode(signed_contribution, Types.SignedContributionAndProof)
{:ok, message} = :snappyer.compress(encoded)
Libp2pPort.publish(topic, message)
end

@spec collect([non_neg_integer()], Types.SyncCommitteeMessage.t()) :: :ok
def collect(subnet_ids, message) do
join(subnet_ids)

for subnet_id <- subnet_ids do
SyncSubnetInfo.new_subnet_with_message(subnet_id, message)
Libp2pPort.async_subscribe_to_topic(topic(subnet_id), __MODULE__)
end

:ok
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
SyncSubnetInfo.stop_collecting(subnet_id)
end

defp topic(subnet_id) do
# TODO: this doesn't take into account fork digest changes
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)
"/eth2/#{fork_context}/sync_committee_#{subnet_id}/ssz_snappy"
end

defp update_enr(%{attnets: attnets, syncnets: syncnets}) do
enr_fork_id = compute_enr_fork_id()
Libp2pPort.update_enr(enr_fork_id, attnets, syncnets)
end

defp compute_enr_fork_id() do
current_version = ForkChoice.get_fork_version()

fork_digest =
Misc.compute_fork_digest(current_version, ChainSpec.get_genesis_validators_root())

%Types.EnrForkId{
fork_digest: fork_digest,
next_fork_version: current_version,
next_fork_epoch: Constants.far_future_epoch()
}
end

@subnet_id_start byte_size("/eth2/00000000/sync_committee_")

defp extract_subnet_id(<<_::binary-size(@subnet_id_start)>> <> id_with_trailer) do
id_with_trailer |> String.trim_trailing("/ssz_snappy") |> String.to_integer()
end
end
22 changes: 22 additions & 0 deletions lib/lambda_ethereum_consensus/state_transition/accessors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,28 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do

@max_random_byte 2 ** 8 - 1

@doc """
Compute the correct sync committee for a given `epoch`.
"""
def get_sync_committee_for_epoch!(%BeaconState{} = state, epoch) do
sync_committee_period = Misc.compute_sync_committee_period(epoch)
current_epoch = get_current_epoch(state)
current_sync_committee_period = Misc.compute_sync_committee_period(current_epoch)
next_sync_committee_period = current_sync_committee_period + 1

case sync_committee_period do
^current_sync_committee_period ->
state.current_sync_committee

^next_sync_committee_period ->
state.next_sync_committee

_ ->
raise ArgumentError,
"Invalid epoch #{epoch}, should be in the current or next sync committee period"
end
end

@doc """
Return the next sync committee, with possible pubkey duplicates.
"""
Expand Down
5 changes: 5 additions & 0 deletions lib/lambda_ethereum_consensus/state_transition/misc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ defmodule LambdaEthereumConsensus.StateTransition.Misc do
div(epoch, ChainSpec.get("EPOCHS_PER_SYNC_COMMITTEE_PERIOD"))
end

@spec sync_subcommittee_size() :: Types.uint64()
def sync_subcommittee_size() do
div(ChainSpec.get("SYNC_COMMITTEE_SIZE"), Constants.sync_committee_subnet_count())
end

@doc """
Return the 32-byte fork data root for the ``current_version`` and ``genesis_validators_root``.
This is used primarily in signature domains to avoid collisions across forks/chains.
Expand Down
Loading

0 comments on commit 2cf6ef5

Please sign in to comment.