Skip to content

Commit

Permalink
Optimize process_sync_aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaRedHand committed Nov 29, 2023
1 parent c5e1a18 commit 1258d0b
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 102 deletions.
6 changes: 2 additions & 4 deletions lib/lambda_ethereum_consensus/p2p/gossip_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do

alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.ForkChoice.Store
alias LambdaEthereumConsensus.Utils.BitVector
alias SszTypes.{AggregateAndProof, SignedAggregateAndProof, SignedBeaconBlock}

@spec handle_message(String.t(), struct) :: :ok
Expand All @@ -31,7 +32,7 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do
"/eth2/bba4da96/beacon_aggregate_and_proof/ssz_snappy",
%SignedAggregateAndProof{message: %AggregateAndProof{aggregate: aggregate}}
) do
votes = count_bits(aggregate.aggregation_bits)
votes = BitVector.count(aggregate.aggregation_bits)
slot = aggregate.data.slot
root = aggregate.data.beacon_block_root |> Base.encode16()

Expand All @@ -49,7 +50,4 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do
|> then(&"[#{topic_name}] decoded: '#{&1}'")
|> Logger.debug()
end

defp count_bits(bitstring),
do: for(<<bit::1 <- bitstring>>, do: bit) |> Enum.sum()
end
28 changes: 13 additions & 15 deletions lib/lambda_ethereum_consensus/state_transition/accessors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
"""
@spec get_active_validator_indices(BeaconState.t(), SszTypes.epoch()) ::
list(SszTypes.validator_index())
def get_active_validator_indices(%BeaconState{validators: validators} = _state, epoch) do
def get_active_validator_indices(%BeaconState{validators: validators}, epoch) do
validators
|> Stream.with_index()
|> Stream.filter(fn {v, _} ->
Expand Down Expand Up @@ -184,8 +184,13 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
"""
@spec get_total_active_balance(BeaconState.t()) :: SszTypes.gwei()
def get_total_active_balance(state) do
active_validator_indices = get_active_validator_indices(state, get_current_epoch(state))
get_total_balance(state, active_validator_indices)
epoch = get_current_epoch(state)

state.validators
|> Stream.zip(state.balances)
|> Stream.filter(&Predicates.is_active_validator(elem(&1, 0), epoch))
|> Stream.map(&elem(&1, 1))
|> Enum.sum()
end

@doc """
Expand Down Expand Up @@ -227,23 +232,16 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
Return the beacon proposer index at the current slot.
"""
@spec get_beacon_proposer_index(BeaconState.t()) ::
{:ok, SszTypes.validator_index()} | {:error, binary()}
{:ok, SszTypes.validator_index()} | {:error, String.t()}
def get_beacon_proposer_index(state) do
epoch = get_current_epoch(state)

seed =
:crypto.hash(
:sha256,
get_seed(state, epoch, Constants.domain_beacon_proposer()) <>
Misc.uint64_to_bytes(state.slot)
)

indices = get_active_validator_indices(state, epoch)

case Misc.compute_proposer_index(state, indices, seed) do
{:error, msg} -> {:error, msg}
{:ok, i} -> {:ok, i}
end
state
|> get_seed(epoch, Constants.domain_beacon_proposer())
|> then(&:crypto.hash(:sha256, &1 <> Misc.uint64_to_bytes(state.slot)))
|> then(&Misc.compute_proposer_index(state, indices, &1))
end

@doc """
Expand Down
150 changes: 67 additions & 83 deletions lib/lambda_ethereum_consensus/state_transition/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do
This module contains functions for handling state transition
"""

alias LambdaEthereumConsensus.StateTransition.{Accessors, Misc, Mutators, Predicates}
alias LambdaEthereumConsensus.Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Test

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Build project

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (general)

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (general)

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (mainnet)

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (mainnet)

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (minimal)

unused alias Engine

Check warning on line 6 in lib/lambda_ethereum_consensus/state_transition/operations.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (minimal)

unused alias Engine
alias LambdaEthereumConsensus.StateTransition.{Accessors, Math, Misc, Mutators, Predicates}
alias LambdaEthereumConsensus.Utils.BitVector
alias SszTypes.BeaconBlockBody

Expand Down Expand Up @@ -109,80 +110,90 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do

@spec process_sync_aggregate(BeaconState.t(), SyncAggregate.t()) ::
{:ok, BeaconState.t()} | {:error, String.t()}
def process_sync_aggregate(
%BeaconState{
slot: slot,
current_sync_committee: current_sync_committee,
validators: validators
} = state,
%SyncAggregate{
sync_committee_bits: sync_committee_bits,
sync_committee_signature: sync_committee_signature
}
) do
def process_sync_aggregate(%BeaconState{} = state, %SyncAggregate{} = aggregate) do
# Verify sync committee aggregate signature signing over the previous slot block root
committee_pubkeys = current_sync_committee.pubkeys

# TODO: Change bitvectors to be in little-endian instead of converting manually
sync_committee_bits_as_num = sync_committee_bits |> :binary.decode_unsigned()

sync_committee_bits =
<<sync_committee_bits_as_num::unsigned-integer-little-size(bit_size(sync_committee_bits))>>
committee_pubkeys = state.current_sync_committee.pubkeys
sync_committee_bits = parse_sync_committee_bits(aggregate.sync_committee_bits)

participant_pubkeys =
Enum.with_index(committee_pubkeys)
committee_pubkeys
|> Enum.with_index()
|> Enum.filter(fn {_, index} -> BitVector.set?(sync_committee_bits, index) end)
|> Enum.map(fn {public_key, _} -> public_key end)

previous_slot = max(slot, 1) - 1
previous_slot = max(state.slot, 1) - 1
epoch = Misc.compute_epoch_at_slot(previous_slot)
domain = Accessors.get_domain(state, Constants.domain_sync_committee(), epoch)

with {:ok, block_root} <- Accessors.get_block_root_at_slot(state, previous_slot),
signing_root <- Misc.compute_signing_root(block_root, domain),
{:ok, true} <-
Bls.eth_fast_aggregate_verify(
participant_pubkeys,
signing_root,
sync_committee_signature
) do
:ok <-
verify_signature(participant_pubkeys, signing_root, aggregate.sync_committee_signature),
{:ok, proposer_index} <- Accessors.get_beacon_proposer_index(state) do
# Compute participant and proposer rewards
{participant_reward, proposer_reward} = compute_sync_aggregate_rewards(state)

total_proposer_reward = BitVector.count(sync_committee_bits) * proposer_reward

# PERF: make Map with committee_index by pubkey, then
# Enum.map validators -> new balance all in place, without map_reduce
committee_deltas =
state.validators
|> get_sync_committee_indices(committee_pubkeys)
|> Stream.with_index()
|> Stream.map(fn {validator_index, committee_index} ->
if BitVector.set?(sync_committee_bits, committee_index),
do: {validator_index, participant_reward},
else: {validator_index, -participant_reward}
end)
|> Enum.sort(fn {vi1, _}, {vi2, _} -> vi1 <= vi2 end)

# Apply participant and proposer rewards
committee_indices = get_sync_committee_indices(validators, committee_pubkeys)
{new_balances, []} =
state.balances
|> Stream.with_index()
|> Stream.map(&add_proposer_reward(&1, proposer_index, total_proposer_reward))
|> Enum.map_reduce(committee_deltas, &update_balance/2)

Stream.with_index(committee_indices)
|> Enum.reduce_while({:ok, state}, fn {participant_index, index}, {_, state} ->
if BitVector.set?(sync_committee_bits, index) do
state
|> increase_balance_or_return_error(
participant_index,
participant_reward,
proposer_reward
)
else
{:cont,
{:ok, state |> Mutators.decrease_balance(participant_index, participant_reward)}}
end
end)
else
{:ok, false} -> {:error, "Signature verification failed"}
{:error, message} -> {:error, message}
{:ok, %BeaconState{state | balances: new_balances}}
end
end

defp add_proposer_reward({balance, proposer}, proposer, proposer_reward),
do: {balance + proposer_reward, proposer}

defp add_proposer_reward(v, _, _), do: v

defp update_balance({balance, i}, [{i, delta} | acc]), do: {max(balance + delta, 0), acc}
defp update_balance({balance, _}, acc), do: {balance, acc}

defp verify_signature(pubkeys, message, signature) do
case Bls.eth_fast_aggregate_verify(pubkeys, message, signature) do
{:ok, true} -> :ok
_ -> {:error, "Signature verification failed"}
end
end

defp parse_sync_committee_bits(bits) do
# TODO: Change bitvectors to be in little-endian instead of converting manually
bitsize = bit_size(bits)
<<num::integer-size(bitsize)>> = bits
<<num::integer-little-size(bitsize)>>
end

@spec compute_sync_aggregate_rewards(BeaconState.t()) :: {SszTypes.gwei(), SszTypes.gwei()}
defp compute_sync_aggregate_rewards(state) do
# Compute participant and proposer rewards
total_active_increments =
div(
Accessors.get_total_active_balance(state),
ChainSpec.get("EFFECTIVE_BALANCE_INCREMENT")
)
total_active_balance = Accessors.get_total_active_balance(state)

total_base_rewards =
Accessors.get_base_reward_per_increment(state) * total_active_increments
effective_balance_increment = ChainSpec.get("EFFECTIVE_BALANCE_INCREMENT")

total_active_increments = total_active_balance |> div(effective_balance_increment)

numerator = effective_balance_increment * Constants.base_reward_factor()
denominator = Math.integer_squareroot(total_active_balance)
base_reward_per_increment = div(numerator, denominator)
total_base_rewards = base_reward_per_increment * total_active_increments

max_participant_rewards =
(total_base_rewards * Constants.sync_reward_weight())
Expand All @@ -201,41 +212,14 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do
@spec get_sync_committee_indices(list(Validator.t()), list(SszTypes.bls_pubkey())) ::
list(integer)
defp get_sync_committee_indices(validators, committee_pubkeys) do
# Apply participant and proposer rewards
all_pubkeys =
validators
|> Enum.map(fn %Validator{pubkey: pubkey} -> pubkey end)
|> Stream.map(fn %Validator{pubkey: pubkey} -> pubkey end)
|> Stream.with_index()
|> Map.new()

committee_pubkeys
|> Enum.with_index()
|> Enum.map(fn {public_key, _} ->
Enum.find_index(all_pubkeys, fn x -> x == public_key end)
end)
end

@spec increase_balance_or_return_error(
BeaconState.t(),
SszTypes.validator_index(),
SszTypes.gwei(),
SszTypes.gwei()
) :: {:cont, {:ok, BeaconState.t()}} | {:halt, {:error, String.t()}}
defp increase_balance_or_return_error(
%BeaconState{} = state,
participant_index,
participant_reward,
proposer_reward
) do
case Accessors.get_beacon_proposer_index(state) do
{:ok, proposer_index} ->
{:cont,
{:ok,
state
|> Mutators.increase_balance(participant_index, participant_reward)
|> Mutators.increase_balance(proposer_index, proposer_reward)}}

{:error, _} ->
{:halt, {:error, "Error getting beacon proposer index"}}
end
|> Enum.map(&Map.fetch!(all_pubkeys, &1))
end

@doc """
Expand Down
6 changes: 6 additions & 0 deletions lib/utils/bit_vector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,10 @@ defmodule LambdaEthereumConsensus.Utils.BitVector do
<<remaining::size(bit_size(bit_vector) - steps)-bitstring, _::bitstring>> = bit_vector
<<0::size(steps), remaining::bitstring>>
end

@doc """
Returns the amount of bits set.
"""
@spec count(t) :: non_neg_integer()
def count(bit_vector), do: for(<<bit::1 <- bit_vector>>, do: bit) |> Enum.sum()
end

0 comments on commit 1258d0b

Please sign in to comment.