Skip to content

Commit

Permalink
Comments addressed
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o committed Sep 10, 2024
1 parent f82adb2 commit 0e2fa72
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 50 deletions.
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/p2p/gossip/attestation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.Attestation do
@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
# TODO: implement some way to unsubscribe without leaving the topic
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
Expand Down
5 changes: 2 additions & 3 deletions lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
{:ok, list(Types.SyncCommitteeMessage.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
# TODO from Attestation: implement some way to unsubscribe without leaving the topic
# TODO: This handle individual subnet_id while the other ones handle lists.
# TODO: (#1289) implement some way to unsubscribe without leaving the topic
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
Expand Down
33 changes: 18 additions & 15 deletions lib/lambda_ethereum_consensus/validator/duties.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
aggregation: [sync_committee_aggregator_duty()]
}

@type misc_data :: %{sync_subcommittee_participants: %{}}
@typedoc "Useful precalculated data not tied to a particular slot/duty."
@type shared_data_for_duties :: %{sync_subcommittee_participants: %{}}

@type attester_duties :: [attester_duty()]
@type proposer_duties :: [proposer_duty()]
Expand All @@ -51,13 +52,13 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
@type proposer_duties_per_slot :: %{Types.slot() => proposer_duties()}
@type sync_committee_duties_per_slot :: %{Types.slot() => sync_committee_duties()}

@type kind :: :proposers | :attesters | :sync_committees | :misc_data
@type kind :: :proposers | :attesters | :sync_committees | :shared
@type duties :: %{
kind() =>
attester_duties_per_slot()
| proposer_duties_per_slot()
| sync_committee_duties_per_slot()
| misc_data()
| shared_data_for_duties()
}

############################
Expand All @@ -84,26 +85,26 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
new_proposers = compute_proposers_for_epoch(beacon, epoch, validators)
new_attesters = compute_attesters_for_epoch(beacon, epoch, validators)

{new_sync_committees, new_misc_data} =
{new_sync_committees, sync_subcommittee_participants} =
case sync_committee_compute_check(epoch, {last_epoch, Map.get(duties_map, last_epoch)}) do
{:already_computed, sync_committee_duties} ->
sync_committee_duties
|> recompute_sync_committee_duties(beacon, epoch, validators)
|> then(&{&1, misc_data(duties_map, last_epoch)})
|> then(&{&1, sync_subcommittee_participants(duties_map, last_epoch)})

{:not_computed, period} ->
Logger.debug("[Duties] Computing sync committees for period: #{period}.")

beacon
|> compute_sync_committee_duties(epoch, validators)
|> then(&{&1, compute_misc_data(beacon, epoch)})
|> then(&{&1, compute_sync_subcommittee_participants(beacon, epoch)})
end

new_duties = %{
proposers: new_proposers,
attesters: new_attesters,
sync_committees: new_sync_committees,
misc_data: new_misc_data
shared: %{sync_subcommittee_participants: sync_subcommittee_participants}
}

log_duties_for_epoch(new_duties, epoch)
Expand All @@ -125,12 +126,11 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
end
end

@spec compute_misc_data(BeaconState.t(), Types.epoch()) :: misc_data()
defp compute_misc_data(beacon, epoch) do
%{
sync_subcommittee_participants: Utils.sync_subcommittee_participants(beacon, epoch)
}
end
@spec compute_sync_subcommittee_participants(BeaconState.t(), Types.epoch()) :: %{
non_neg_integer() => [non_neg_integer()]
}
defp compute_sync_subcommittee_participants(beacon, epoch),
do: Utils.sync_subcommittee_participants(beacon, epoch)

defp sync_committee_compute_check(epoch, {_last_epoch, nil}),
do: {:not_computed, Misc.compute_sync_committee_period(epoch)}
Expand Down Expand Up @@ -347,8 +347,11 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
end
end

@spec misc_data(duties(), Types.epoch()) :: misc_data()
def misc_data(duties, epoch), do: get_in(duties, [epoch, :misc_data]) || %{}
@spec sync_subcommittee_participants(duties(), Types.epoch()) :: %{
non_neg_integer() => [non_neg_integer()]
}
def sync_subcommittee_participants(duties, epoch),
do: get_in(duties, [epoch, :shared, :sync_subcommittee_participants]) || %{}

defp sync_committee(duties, epoch, slot),
do: get_in(duties, [epoch, :sync_committees, slot]) || []
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/validator/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ defmodule LambdaEthereumConsensus.Validator.Utils do
- For subcommittee 1, validator 2 is at index 0 and 2, validator 0 is at index 1
"""
@spec sync_subcommittee_participants(BeaconState.t(), Types.epoch()) ::
%{non_neg_integer() => [Bls.pubkey()]}
%{non_neg_integer() => [non_neg_integer()]}
def sync_subcommittee_participants(state, epoch) do
state
|> Accessors.get_sync_committee_for_epoch!(epoch)
Expand Down
8 changes: 5 additions & 3 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule LambdaEthereumConsensus.Validator do
@spec new(Keystore.t(), Types.slot(), Types.root()) :: t()
def new(keystore, head_slot, head_root) do
epoch = Misc.compute_epoch_at_slot(head_slot)
# TODO: This should be handled in the ValidatorSet instead, part of #1281
# TODO: (#1281) This should be handled in the ValidatorSet instead
beacon = ValidatorSet.fetch_target_state_and_go_to_slot(epoch, head_slot, head_root)

new(keystore, beacon)
Expand Down Expand Up @@ -102,6 +102,7 @@ defmodule LambdaEthereumConsensus.Validator do
@spec publish_aggregate(t(), Duties.attester_duty(), Types.slot()) ::
:ok
def publish_aggregate(%{index: validator_index, keystore: keystore}, duty, slot) do
# TODO: (#1286) after stop collecting for the first validator in a slot the others are not able to publish
case Gossip.Attestation.stop_collecting(duty.subnet_id) do
{:ok, attestations} ->
log_md = [slot: slot, attestations: attestations]
Expand All @@ -120,7 +121,7 @@ defmodule LambdaEthereumConsensus.Validator do
end

defp aggregate_attestations(attestations) do
# TODO: We need to check why we are producing duplicate attestations, this was generating invalid signatures
# TODO: (#1254) We need to check why we are producing duplicate attestations, this was generating invalid signatures
unique_attestations = attestations |> Enum.uniq()

aggregation_bits =
Expand Down Expand Up @@ -263,6 +264,7 @@ defmodule LambdaEthereumConsensus.Validator do
sync_subcommittee_participants,
slot
) do
# TODO: (#1286) after stop collecting for the first validator in a slot the others are not able to publish
for %{subcommittee_index: subnet_id} = aggregation_duty <- duty.aggregation do
case Gossip.SyncCommittee.stop_collecting(subnet_id) do
{:ok, messages} ->
Expand Down Expand Up @@ -308,7 +310,7 @@ defmodule LambdaEthereumConsensus.Validator do
end

defp aggregate_sync_committee_signature(messages, indices_in_subcommittee) do
# TODO: as with attestations, we need to check why we recieve duplicate sync messages
# TODO: (#1254) as with attestations, we need to check why we recieve duplicate sync messages
unique_messages = messages |> Enum.uniq()

signatures_to_aggregate =
Expand Down
5 changes: 3 additions & 2 deletions lib/lambda_ethereum_consensus/validator/validator_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,14 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
defp maybe_publish_sync_aggregates(set, slot) do
# Sync committee is broadcasted for the next slot, so we take the duties for the correct epoch.
epoch = Misc.compute_epoch_at_slot(slot + 1)
%{sync_subcommittee_participants: participants} = Duties.misc_data(set.duties, epoch)

case Duties.current_sync_aggregators(set.duties, epoch, slot) do
[] ->
set

aggregator_duties ->
participants = Duties.sync_subcommittee_participants(set.duties, epoch)

aggregator_duties
|> Enum.map(&publish_sync_aggregate(&1, participants, slot, set.validators))
|> update_duties(set, epoch, :sync_committees, slot)
Expand Down Expand Up @@ -286,7 +287,7 @@ defmodule LambdaEthereumConsensus.ValidatorSet do

##########################
# Target State

# TODO: (#1278) This should be taken from the store as noted by arkenan.
@spec fetch_target_state_and_go_to_slot(Types.epoch(), Types.slot(), Types.root()) ::
Types.BeaconState.t()
def fetch_target_state_and_go_to_slot(epoch, slot, root) do
Expand Down
28 changes: 3 additions & 25 deletions test/unit/sync_subnet_info.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,7 @@ defmodule Unit.SyncSubnetInfoTest do
test "stop collecting with one attestation" do
subnet_id = 1

expected_message = %SyncCommitteeMessage{
slot: 5_057_010_135_270_197_978,
beacon_block_root:
<<31, 38, 101, 174, 248, 168, 116, 226, 15, 39, 218, 148, 42, 8, 80, 80, 241, 149, 162,
32, 176, 208, 120, 120, 89, 123, 136, 115, 154, 28, 21, 174>>,
validator_index: 0,
signature: <<>>
}
expected_message = sync_committee_message()

SyncSubnetInfo.new_subnet_with_message(subnet_id, sync_committee_message())

Expand All @@ -49,23 +42,8 @@ defmodule Unit.SyncSubnetInfoTest do
test "stop collecting with two attestations" do
subnet_id = 1

expected_message_1 = %SyncCommitteeMessage{
slot: 5_057_010_135_270_197_978,
beacon_block_root:
<<31, 38, 101, 174, 248, 168, 116, 226, 15, 39, 218, 148, 42, 8, 80, 80, 241, 149, 162,
32, 176, 208, 120, 120, 89, 123, 136, 115, 154, 28, 21, 174>>,
validator_index: 1,
signature: <<>>
}

expected_message_2 = %SyncCommitteeMessage{
slot: 5_057_010_135_270_197_978,
beacon_block_root:
<<31, 38, 101, 174, 248, 168, 116, 226, 15, 39, 218, 148, 42, 8, 80, 80, 241, 149, 162,
32, 176, 208, 120, 120, 89, 123, 136, 115, 154, 28, 21, 174>>,
validator_index: 2,
signature: <<>>
}
expected_message_1 = sync_committee_message(1)
expected_message_2 = sync_committee_message(2)

SyncSubnetInfo.new_subnet_with_message(subnet_id, sync_committee_message(1))

Expand Down

0 comments on commit 0e2fa72

Please sign in to comment.