From 6c60c11bbe1900b01f67248505c2cde6475b0831 Mon Sep 17 00:00:00 2001 From: Akash S M Date: Thu, 30 Nov 2023 19:03:47 +0530 Subject: [PATCH 01/14] feat: implement process_operations (#473) --- .../state_transition/operations.ex | 41 ++++++++++++++++++- .../state_transition/state_transition.ex | 2 +- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/lib/lambda_ethereum_consensus/state_transition/operations.ex b/lib/lambda_ethereum_consensus/state_transition/operations.ex index f45788fcb..ea07f6e8d 100644 --- a/lib/lambda_ethereum_consensus/state_transition/operations.ex +++ b/lib/lambda_ethereum_consensus/state_transition/operations.ex @@ -5,11 +5,11 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do alias LambdaEthereumConsensus.StateTransition.{Accessors, Misc, Mutators, Predicates} alias LambdaEthereumConsensus.Utils.BitVector - alias SszTypes.BeaconBlockBody alias SszTypes.{ Attestation, BeaconBlock, + BeaconBlockBody, BeaconBlockHeader, BeaconState, ExecutionPayload, @@ -1039,4 +1039,43 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do updated_validators end + + @spec process_operations(BeaconState.t(), BeaconBlockBody.t()) :: + {:ok, BeaconState.t()} | {:error, binary} + def process_operations(state, body) do + # Ensure that outstanding deposits are processed up to the maximum number of deposits + with :ok <- verify_deposits(state, body) do + # Define a function that iterates over a list of operations and applies a given function to each element + updated_state = + state + |> for_ops(body.proposer_slashings, &process_proposer_slashing/2) + |> for_ops(body.attester_slashings, &process_attester_slashing/2) + |> for_ops(body.attestations, &process_attestation/2) + |> for_ops(body.deposits, &process_deposit/2) + |> for_ops(body.voluntary_exits, &process_voluntary_exit/2) + |> for_ops(body.bls_to_execution_changes, &process_bls_to_execution_change/2) + + {:ok, updated_state} + end + end + + defp for_ops(state, operations, func) do + Enum.reduce(operations, state, fn operation, acc -> + with {:ok, state} <- func.(acc, operation) do + state + end + end) + end + + @spec verify_deposits(BeaconState.t(), BeaconBlockBody.t()) :: :ok | {:error, binary} + defp verify_deposits(state, body) do + deposit_count = state.eth1_data.deposit_count - state.eth1_deposit_index + deposit_limit = min(ChainSpec.get("MAX_DEPOSITS"), deposit_count) + + if length(body.deposits) == deposit_limit do + :ok + else + {:error, "deposits length mismatch"} + end + end end diff --git a/lib/lambda_ethereum_consensus/state_transition/state_transition.ex b/lib/lambda_ethereum_consensus/state_transition/state_transition.ex index 81fb02f04..cd190b952 100644 --- a/lib/lambda_ethereum_consensus/state_transition/state_transition.ex +++ b/lib/lambda_ethereum_consensus/state_transition/state_transition.ex @@ -126,7 +126,7 @@ defmodule LambdaEthereumConsensus.StateTransition do |> map(&Operations.process_execution_payload(&1, block.body, verify_and_notify_new_payload)) |> map(&Operations.process_randao(&1, block.body)) |> map(&Operations.process_eth1_data(&1, block.body)) - # |> map(&Operations.process_operations(&1, block.body)) + |> map(&Operations.process_operations(&1, block.body)) |> map(&Operations.process_sync_aggregate(&1, block.body.sync_aggregate)) end end From d4fdbff9566a48098a7951190bd32ef6ab3ca6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 30 Nov 2023 11:03:40 -0300 Subject: [PATCH 02/14] test: add simple `Handlers.on_block` test (#475) --- .../integration/fork_choice/handlers_test.exs | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 test/integration/fork_choice/handlers_test.exs diff --git a/test/integration/fork_choice/handlers_test.exs b/test/integration/fork_choice/handlers_test.exs new file mode 100644 index 000000000..5bd47069c --- /dev/null +++ b/test/integration/fork_choice/handlers_test.exs @@ -0,0 +1,29 @@ +defmodule Integration.ForkChoice.HandlersTest do + use ExUnit.Case + + alias LambdaEthereumConsensus.ForkChoice.Handlers + alias LambdaEthereumConsensus.ForkChoice.Helpers + alias LambdaEthereumConsensus.Store.BlockStore + alias LambdaEthereumConsensus.Store.Db + alias LambdaEthereumConsensus.Store.StateStore + + setup_all do + start_supervised!(Db) + :ok + end + + @tag :skip + test "on_block w/data from DB" do + # NOTE: this test requires a DB with a state, and blocks for the state's slot and the next slot. + # WARN: sometimes fails with "OffsetOutOfBounds" errors. Re-run the test in those cases. + {:ok, state} = StateStore.get_latest_state() + + {:ok, signed_block} = BlockStore.get_block_by_slot(state.slot) + {:ok, new_signed_block} = BlockStore.get_block_by_slot(state.slot + 1) + + assert {:ok, store} = Helpers.get_forkchoice_store(state, signed_block.message) + new_store = Handlers.on_tick(store, :os.system_time(:second)) + + assert {:ok, _} = Handlers.on_block(new_store, new_signed_block) + end +end From 914cb3b97de57e472a60ddc8eb6ecf5f26507d8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 30 Nov 2023 13:59:33 -0300 Subject: [PATCH 03/14] perf: optimize `process_withdrawals` and `process_sync_committee_updates` (#486) --- .../p2p/gossip_handler.ex | 6 +- .../state_transition/accessors.ex | 35 +- .../state_transition/misc.ex | 7 +- .../state_transition/operations.ex | 361 ++++++++---------- .../state_transition/predicates.ex | 5 +- lib/ssz_ex.ex | 5 + lib/utils/bit_vector.ex | 6 + 7 files changed, 193 insertions(+), 232 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip_handler.ex b/lib/lambda_ethereum_consensus/p2p/gossip_handler.ex index dbd79265f..9656bf0c4 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip_handler.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip_handler.ex @@ -7,6 +7,7 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do alias LambdaEthereumConsensus.Beacon.PendingBlocks alias LambdaEthereumConsensus.ForkChoice.Store + alias LambdaEthereumConsensus.Utils.BitVector alias SszTypes.{AggregateAndProof, SignedAggregateAndProof, SignedBeaconBlock} @spec handle_message(String.t(), struct) :: :ok @@ -31,7 +32,7 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do "/eth2/bba4da96/beacon_aggregate_and_proof/ssz_snappy", %SignedAggregateAndProof{message: %AggregateAndProof{aggregate: aggregate}} ) do - votes = count_bits(aggregate.aggregation_bits) + votes = BitVector.count(aggregate.aggregation_bits) slot = aggregate.data.slot root = aggregate.data.beacon_block_root |> Base.encode16() @@ -49,7 +50,4 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do |> then(&"[#{topic_name}] decoded: '#{&1}'") |> Logger.debug() end - - defp count_bits(bitstring), - do: for(<>, do: bit) |> Enum.sum() end diff --git a/lib/lambda_ethereum_consensus/state_transition/accessors.ex b/lib/lambda_ethereum_consensus/state_transition/accessors.ex index abf4403ea..b2f2d04f5 100644 --- a/lib/lambda_ethereum_consensus/state_transition/accessors.ex +++ b/lib/lambda_ethereum_consensus/state_transition/accessors.ex @@ -3,8 +3,9 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do Functions accessing the current `BeaconState` """ + alias LambdaEthereumConsensus.SszEx alias LambdaEthereumConsensus.StateTransition.{Math, Misc, Predicates} - alias SszTypes.{Attestation, BeaconState, IndexedAttestation, SyncCommittee} + alias SszTypes.{Attestation, BeaconState, IndexedAttestation, SyncCommittee, Validator} @doc """ Return the next sync committee, with possible pubkey duplicates. @@ -86,7 +87,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do candidate_index = active_validator_indices |> Enum.fetch!(shuffled_index) <<_::binary-size(rem(index, 32)), random_byte, _::binary>> = - :crypto.hash(:sha256, seed <> Misc.uint64_to_bytes(div(index, 32))) + SszEx.hash(seed <> Misc.uint64_to_bytes(div(index, 32))) effective_balance = Enum.fetch!(validators, candidate_index).effective_balance @@ -104,7 +105,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do """ @spec get_active_validator_indices(BeaconState.t(), SszTypes.epoch()) :: list(SszTypes.validator_index()) - def get_active_validator_indices(%BeaconState{validators: validators} = _state, epoch) do + def get_active_validator_indices(%BeaconState{validators: validators}, epoch) do validators |> Stream.with_index() |> Stream.filter(fn {v, _} -> @@ -184,8 +185,13 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do """ @spec get_total_active_balance(BeaconState.t()) :: SszTypes.gwei() def get_total_active_balance(state) do - active_validator_indices = get_active_validator_indices(state, get_current_epoch(state)) - get_total_balance(state, active_validator_indices) + epoch = get_current_epoch(state) + + state.validators + |> Stream.filter(&Predicates.is_active_validator(&1, epoch)) + |> Stream.map(fn %Validator{effective_balance: effective_balance} -> effective_balance end) + |> Enum.sum() + |> max(ChainSpec.get("EFFECTIVE_BALANCE_INCREMENT")) end @doc """ @@ -227,23 +233,16 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do Return the beacon proposer index at the current slot. """ @spec get_beacon_proposer_index(BeaconState.t()) :: - {:ok, SszTypes.validator_index()} | {:error, binary()} + {:ok, SszTypes.validator_index()} | {:error, String.t()} def get_beacon_proposer_index(state) do epoch = get_current_epoch(state) - seed = - :crypto.hash( - :sha256, - get_seed(state, epoch, Constants.domain_beacon_proposer()) <> - Misc.uint64_to_bytes(state.slot) - ) - indices = get_active_validator_indices(state, epoch) - case Misc.compute_proposer_index(state, indices, seed) do - {:error, msg} -> {:error, msg} - {:ok, i} -> {:ok, i} - end + state + |> get_seed(epoch, Constants.domain_beacon_proposer()) + |> then(&SszEx.hash(&1 <> Misc.uint64_to_bytes(state.slot))) + |> then(&Misc.compute_proposer_index(state, indices, &1)) end @doc """ @@ -412,7 +411,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do ChainSpec.get("MIN_SEED_LOOKAHEAD") - 1 ) - :crypto.hash(:sha256, domain_type <> Misc.uint64_to_bytes(epoch) <> mix) + SszEx.hash(domain_type <> Misc.uint64_to_bytes(epoch) <> mix) end @doc """ diff --git a/lib/lambda_ethereum_consensus/state_transition/misc.ex b/lib/lambda_ethereum_consensus/state_transition/misc.ex index 0cade1233..dd029d20e 100644 --- a/lib/lambda_ethereum_consensus/state_transition/misc.ex +++ b/lib/lambda_ethereum_consensus/state_transition/misc.ex @@ -3,6 +3,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Misc do Misc functions """ + alias LambdaEthereumConsensus.SszEx alias SszTypes.BeaconState import Bitwise alias SszTypes.BeaconState @@ -55,7 +56,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Misc do Enum.reduce(0..(shuffle_round_count - 1), index, fn round, current_index -> round_as_bytes = <> - hash_of_seed_round = :crypto.hash(:sha256, seed <> round_as_bytes) + hash_of_seed_round = SszEx.hash(seed <> round_as_bytes) pivot = rem(bytes_to_uint64(hash_of_seed_round), index_count) @@ -65,7 +66,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Misc do position_div_256 = uint_to_bytes4(div(position, 256)) source = - :crypto.hash(:sha256, seed <> round_as_bytes <> position_div_256) + SszEx.hash(seed <> round_as_bytes <> position_div_256) byte_index = div(rem(position, 256), 8) <<_::binary-size(byte_index), byte, _::binary>> = source @@ -146,7 +147,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Misc do total = length(indices) {:ok, i} = compute_shuffled_index(rem(i, total), total, seed) candidate_index = Enum.at(indices, i) - random_byte = :crypto.hash(:sha256, seed <> uint_to_bytes4(div(i, 32))) + random_byte = SszEx.hash(seed <> uint_to_bytes4(div(i, 32))) <<_::binary-size(rem(i, 32)), byte, _::binary>> = random_byte effective_balance = Enum.at(state.validators, candidate_index).effective_balance diff --git a/lib/lambda_ethereum_consensus/state_transition/operations.ex b/lib/lambda_ethereum_consensus/state_transition/operations.ex index ea07f6e8d..3d0a0766d 100644 --- a/lib/lambda_ethereum_consensus/state_transition/operations.ex +++ b/lib/lambda_ethereum_consensus/state_transition/operations.ex @@ -3,7 +3,8 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do This module contains functions for handling state transition """ - alias LambdaEthereumConsensus.StateTransition.{Accessors, Misc, Mutators, Predicates} + alias LambdaEthereumConsensus.SszEx + alias LambdaEthereumConsensus.StateTransition.{Accessors, Math, Misc, Mutators, Predicates} alias LambdaEthereumConsensus.Utils.BitVector alias SszTypes.{ @@ -109,80 +110,91 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do @spec process_sync_aggregate(BeaconState.t(), SyncAggregate.t()) :: {:ok, BeaconState.t()} | {:error, String.t()} - def process_sync_aggregate( - %BeaconState{ - slot: slot, - current_sync_committee: current_sync_committee, - validators: validators - } = state, - %SyncAggregate{ - sync_committee_bits: sync_committee_bits, - sync_committee_signature: sync_committee_signature - } - ) do + def process_sync_aggregate(%BeaconState{} = state, %SyncAggregate{} = aggregate) do # Verify sync committee aggregate signature signing over the previous slot block root - committee_pubkeys = current_sync_committee.pubkeys - - # TODO: Change bitvectors to be in little-endian instead of converting manually - sync_committee_bits_as_num = sync_committee_bits |> :binary.decode_unsigned() - - sync_committee_bits = - <> + committee_pubkeys = state.current_sync_committee.pubkeys + sync_committee_bits = parse_sync_committee_bits(aggregate.sync_committee_bits) participant_pubkeys = - Enum.with_index(committee_pubkeys) + committee_pubkeys + |> Enum.with_index() |> Enum.filter(fn {_, index} -> BitVector.set?(sync_committee_bits, index) end) |> Enum.map(fn {public_key, _} -> public_key end) - previous_slot = max(slot, 1) - 1 + previous_slot = max(state.slot, 1) - 1 epoch = Misc.compute_epoch_at_slot(previous_slot) domain = Accessors.get_domain(state, Constants.domain_sync_committee(), epoch) with {:ok, block_root} <- Accessors.get_block_root_at_slot(state, previous_slot), signing_root <- Misc.compute_signing_root(block_root, domain), - {:ok, true} <- - Bls.eth_fast_aggregate_verify( - participant_pubkeys, - signing_root, - sync_committee_signature - ) do + :ok <- + verify_signature(participant_pubkeys, signing_root, aggregate.sync_committee_signature), + {:ok, proposer_index} <- Accessors.get_beacon_proposer_index(state) do # Compute participant and proposer rewards {participant_reward, proposer_reward} = compute_sync_aggregate_rewards(state) + total_proposer_reward = BitVector.count(sync_committee_bits) * proposer_reward + + # PERF: make Map with committee_index by pubkey, then + # Enum.map validators -> new balance all in place, without map_reduce + committee_deltas = + state.validators + |> get_sync_committee_indices(committee_pubkeys) + |> Stream.with_index() + |> Stream.map(fn {validator_index, committee_index} -> + if BitVector.set?(sync_committee_bits, committee_index), + do: {validator_index, participant_reward}, + else: {validator_index, -participant_reward} + end) + |> Enum.sort(fn {vi1, _}, {vi2, _} -> vi1 <= vi2 end) + # Apply participant and proposer rewards - committee_indices = get_sync_committee_indices(validators, committee_pubkeys) + {new_balances, []} = + state.balances + |> Stream.with_index() + |> Stream.map(&add_proposer_reward(&1, proposer_index, total_proposer_reward)) + |> Enum.map_reduce(committee_deltas, &update_balance/2) - Stream.with_index(committee_indices) - |> Enum.reduce_while({:ok, state}, fn {participant_index, index}, {_, state} -> - if BitVector.set?(sync_committee_bits, index) do - state - |> increase_balance_or_return_error( - participant_index, - participant_reward, - proposer_reward - ) - else - {:cont, - {:ok, state |> Mutators.decrease_balance(participant_index, participant_reward)}} - end - end) - else - {:ok, false} -> {:error, "Signature verification failed"} - {:error, message} -> {:error, message} + {:ok, %BeaconState{state | balances: new_balances}} end end + defp add_proposer_reward({balance, proposer}, proposer, proposer_reward), + do: {balance + proposer_reward, proposer} + + defp add_proposer_reward(v, _, _), do: v + + defp update_balance({balance, i}, [{i, delta} | acc]), + do: update_balance({max(balance + delta, 0), i}, acc) + + defp update_balance({balance, _}, acc), do: {balance, acc} + + defp verify_signature(pubkeys, message, signature) do + case Bls.eth_fast_aggregate_verify(pubkeys, message, signature) do + {:ok, true} -> :ok + _ -> {:error, "Signature verification failed"} + end + end + + defp parse_sync_committee_bits(bits) do + # TODO: Change bitvectors to be in little-endian instead of converting manually + bitsize = bit_size(bits) + <> = bits + <> + end + @spec compute_sync_aggregate_rewards(BeaconState.t()) :: {SszTypes.gwei(), SszTypes.gwei()} defp compute_sync_aggregate_rewards(state) do # Compute participant and proposer rewards - total_active_increments = - div( - Accessors.get_total_active_balance(state), - ChainSpec.get("EFFECTIVE_BALANCE_INCREMENT") - ) + total_active_balance = Accessors.get_total_active_balance(state) - total_base_rewards = - Accessors.get_base_reward_per_increment(state) * total_active_increments + effective_balance_increment = ChainSpec.get("EFFECTIVE_BALANCE_INCREMENT") + total_active_increments = total_active_balance |> div(effective_balance_increment) + + numerator = effective_balance_increment * Constants.base_reward_factor() + denominator = Math.integer_squareroot(total_active_balance) + base_reward_per_increment = div(numerator, denominator) + total_base_rewards = base_reward_per_increment * total_active_increments max_participant_rewards = (total_base_rewards * Constants.sync_reward_weight()) @@ -201,41 +213,14 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do @spec get_sync_committee_indices(list(Validator.t()), list(SszTypes.bls_pubkey())) :: list(integer) defp get_sync_committee_indices(validators, committee_pubkeys) do - # Apply participant and proposer rewards all_pubkeys = validators - |> Enum.map(fn %Validator{pubkey: pubkey} -> pubkey end) + |> Stream.map(fn %Validator{pubkey: pubkey} -> pubkey end) + |> Stream.with_index() + |> Map.new() committee_pubkeys - |> Enum.with_index() - |> Enum.map(fn {public_key, _} -> - Enum.find_index(all_pubkeys, fn x -> x == public_key end) - end) - end - - @spec increase_balance_or_return_error( - BeaconState.t(), - SszTypes.validator_index(), - SszTypes.gwei(), - SszTypes.gwei() - ) :: {:cont, {:ok, BeaconState.t()}} | {:halt, {:error, String.t()}} - defp increase_balance_or_return_error( - %BeaconState{} = state, - participant_index, - participant_reward, - proposer_reward - ) do - case Accessors.get_beacon_proposer_index(state) do - {:ok, proposer_index} -> - {:cont, - {:ok, - state - |> Mutators.increase_balance(participant_index, participant_reward) - |> Mutators.increase_balance(proposer_index, proposer_reward)}} - - {:error, _} -> - {:halt, {:error, "Error getting beacon proposer index"}} - end + |> Enum.map(&Map.fetch!(all_pubkeys, &1)) end @doc """ @@ -317,151 +302,117 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do ) do expected_withdrawals = get_expected_withdrawals(state) - length_of_validators = length(validators) - - with {:ok, state} <- decrease_balances(state, withdrawals, expected_withdrawals) do - {:ok, - state - |> update_next_withdrawal_index(expected_withdrawals) - |> update_next_withdrawal_validator_index(expected_withdrawals, length_of_validators)} + with :ok <- check_withdrawals(withdrawals, expected_withdrawals) do + state + |> decrease_balances(withdrawals) + |> update_next_withdrawal_index(withdrawals) + |> update_next_withdrawal_validator_index(withdrawals, length(validators)) + |> then(&{:ok, &1}) end end + # Update the next withdrawal index if this block contained withdrawals @spec update_next_withdrawal_index(BeaconState.t(), list(Withdrawal.t())) :: BeaconState.t() - defp update_next_withdrawal_index(state, expected_withdrawals) do - # Update the next withdrawal index if this block contained withdrawals - length_of_expected_withdrawals = length(expected_withdrawals) + defp update_next_withdrawal_index(state, []), do: state - case length_of_expected_withdrawals != 0 do - true -> - latest_withdrawal = List.last(expected_withdrawals) - %BeaconState{state | next_withdrawal_index: latest_withdrawal.index + 1} - - false -> - state - end + defp update_next_withdrawal_index(state, withdrawals) do + latest_withdrawal = List.last(withdrawals) + %BeaconState{state | next_withdrawal_index: latest_withdrawal.index + 1} end - @spec update_next_withdrawal_validator_index(BeaconState.t(), list(Withdrawal.t()), integer) :: + @spec update_next_withdrawal_validator_index(BeaconState.t(), list(Withdrawal.t()), integer()) :: BeaconState.t() - defp update_next_withdrawal_validator_index(state, expected_withdrawals, length_of_validators) do - length_of_expected_withdrawals = length(expected_withdrawals) + defp update_next_withdrawal_validator_index(state, withdrawals, validator_len) do + next_index = + if length(withdrawals) == ChainSpec.get("MAX_WITHDRAWALS_PER_PAYLOAD") do + # Update the next validator index to start the next withdrawal sweep + latest_withdrawal = List.last(withdrawals) + latest_withdrawal.validator_index + 1 + else + # Advance sweep by the max length of the sweep if there was not a full set of withdrawals + state.next_withdrawal_validator_index + + ChainSpec.get("MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP") + end - case length_of_expected_withdrawals == ChainSpec.get("MAX_WITHDRAWALS_PER_PAYLOAD") do - # Update the next validator index to start the next withdrawal sweep - true -> - latest_withdrawal = List.last(expected_withdrawals) - next_validator_index = rem(latest_withdrawal.validator_index + 1, length_of_validators) - %BeaconState{state | next_withdrawal_validator_index: next_validator_index} - - # Advance sweep by the max length of the sweep if there was not a full set of withdrawals - false -> - next_index = - state.next_withdrawal_validator_index + - ChainSpec.get("MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP") - - next_validator_index = rem(next_index, length_of_validators) - %BeaconState{state | next_withdrawal_validator_index: next_validator_index} - end + next_validator_index = rem(next_index, validator_len) + %BeaconState{state | next_withdrawal_validator_index: next_validator_index} end - @spec decrease_balances(BeaconState.t(), list(Withdrawal.t()), list(Withdrawal.t())) :: - {:ok, BeaconState.t()} | {:error, String.t()} - defp decrease_balances(_state, withdrawals, expected_withdrawals) + @spec check_withdrawals(list(Withdrawal.t()), list(Withdrawal.t())) :: + :ok | {:error, String.t()} + defp check_withdrawals(withdrawals, expected_withdrawals) when length(withdrawals) !== length(expected_withdrawals) do {:error, "expected withdrawals don't match the state withdrawals in length"} end - @spec decrease_balances(BeaconState.t(), list(Withdrawal.t()), list(Withdrawal.t())) :: - {:ok, BeaconState.t()} | {:error, String.t()} - defp decrease_balances(state, withdrawals, expected_withdrawals) do - Enum.zip(expected_withdrawals, withdrawals) - |> Enum.reduce_while({:ok, state}, &decrease_or_halt/2) + defp check_withdrawals(withdrawals, expected_withdrawals) do + Stream.zip(expected_withdrawals, withdrawals) + |> Enum.all?(fn {expected_withdrawal, withdrawal} -> + expected_withdrawal == withdrawal + end) + |> then(&if &1, do: :ok, else: {:error, "withdrawal doesn't match expected withdrawal"}) end - defp decrease_or_halt({expected_withdrawal, withdrawal}, _) - when expected_withdrawal !== withdrawal do - {:halt, {:error, "withdrawal != expected_withdrawal"}} - end + @spec decrease_balances(BeaconState.t(), list(Withdrawal.t())) :: BeaconState.t() + defp decrease_balances(state, withdrawals) do + withdrawals = Enum.sort(withdrawals, &(&1.validator_index <= &2.validator_index)) - defp decrease_or_halt({_, withdrawal}, {:ok, state}) do - {:cont, - {:ok, BeaconState.decrease_balance(state, withdrawal.validator_index, withdrawal.amount)}} + state.balances + |> Stream.with_index() + |> Enum.map_reduce(withdrawals, &maybe_decrease_balance/2) + |> then(fn {balances, []} -> %BeaconState{state | balances: balances} end) end + defp maybe_decrease_balance({balance, index}, [ + %Withdrawal{validator_index: index, amount: amount} | remaining + ]), + do: {max(balance - amount, 0), remaining} + + defp maybe_decrease_balance({balance, _index}, acc), do: {balance, acc} + @spec get_expected_withdrawals(BeaconState.t()) :: list(Withdrawal.t()) - defp get_expected_withdrawals( - %BeaconState{ - next_withdrawal_index: next_withdrawal_index, - next_withdrawal_validator_index: next_withdrawal_validator_index, - validators: validators, - balances: balances - } = state - ) do + defp get_expected_withdrawals(%BeaconState{} = state) do # Compute the next batch of withdrawals which should be included in a block. epoch = Accessors.get_current_epoch(state) - withdrawal_index = next_withdrawal_index - validator_index = next_withdrawal_validator_index + max_validators_per_withdrawals_sweep = ChainSpec.get("MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP") - bound = min(length(validators), max_validators_per_withdrawals_sweep) - - {withdrawals, _, _} = - Enum.reduce_while(0..(bound - 1), {[], validator_index, withdrawal_index}, fn _, - {withdrawals, - validator_index, - withdrawal_index} -> - validator = Enum.fetch!(validators, validator_index) - balance = Enum.fetch!(balances, validator_index) - %Validator{withdrawal_credentials: withdrawal_credentials} = validator - - {withdrawals, withdrawal_index} = - cond do - Validator.is_fully_withdrawable_validator(validator, balance, epoch) -> - <<_::binary-size(12), execution_address::binary>> = withdrawal_credentials - - withdrawal = %Withdrawal{ - index: withdrawal_index, - validator_index: validator_index, - address: execution_address, - amount: balance - } - - withdrawals = [withdrawal | withdrawals] - withdrawal_index = withdrawal_index + 1 - - {withdrawals, withdrawal_index} - - Validator.is_partially_withdrawable_validator(validator, balance) -> - <<_::binary-size(12), execution_address::binary>> = withdrawal_credentials - max_effective_balance = ChainSpec.get("MAX_EFFECTIVE_BALANCE") - - withdrawal = %Withdrawal{ - index: withdrawal_index, - validator_index: validator_index, - address: execution_address, - amount: balance - max_effective_balance - } - - withdrawals = [withdrawal | withdrawals] - withdrawal_index = withdrawal_index + 1 - - {withdrawals, withdrawal_index} - - true -> - {withdrawals, withdrawal_index} - end - - max_withdrawals_per_payload = ChainSpec.get("MAX_WITHDRAWALS_PER_PAYLOAD") - - if length(withdrawals) == max_withdrawals_per_payload do - {:halt, {withdrawals, validator_index, withdrawal_index}} - else - validator_index = rem(validator_index + 1, length(validators)) - {:cont, {withdrawals, validator_index, withdrawal_index}} - end - end) + max_withdrawals_per_payload = ChainSpec.get("MAX_WITHDRAWALS_PER_PAYLOAD") + max_effective_balance = ChainSpec.get("MAX_EFFECTIVE_BALANCE") + + bound = min(length(state.validators), max_validators_per_withdrawals_sweep) - Enum.reverse(withdrawals) + Stream.zip([state.validators, state.balances]) + |> Stream.with_index() + |> Stream.cycle() + |> Stream.drop(state.next_withdrawal_validator_index) + |> Stream.take(bound) + |> Stream.map(fn {{validator, balance}, index} -> + cond do + Validator.is_fully_withdrawable_validator(validator, balance, epoch) -> + {validator, balance, index} + + Validator.is_partially_withdrawable_validator(validator, balance) -> + {validator, balance - max_effective_balance, index} + + true -> + nil + end + end) + |> Stream.reject(&is_nil/1) + |> Stream.with_index() + |> Stream.map(fn {{validator, balance, validator_index}, index} -> + %Validator{withdrawal_credentials: withdrawal_credentials} = validator + + <<_::binary-size(12), execution_address::binary>> = withdrawal_credentials + + %Withdrawal{ + index: index + state.next_withdrawal_index, + validator_index: validator_index, + address: execution_address, + amount: balance + } + end) + |> Enum.take(max_withdrawals_per_payload) end @spec process_proposer_slashing(BeaconState.t(), SszTypes.ProposerSlashing.t()) :: @@ -832,7 +783,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do if Bls.valid?(proposer.pubkey, signing_root, randao_reveal) do randao_mix = Accessors.get_randao_mix(state, epoch) - hash = :crypto.hash(:sha256, randao_reveal) + hash = SszEx.hash(randao_reveal) # Mix in RANDAO reveal mix = :crypto.exor(randao_mix, hash) @@ -1013,7 +964,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do defp validate_withdrawal_credentials(validator, address_change) do <> = validator.withdrawal_credentials - <<_, hash::binary-size(31)>> = :crypto.hash(:sha256, address_change.from_bls_pubkey) + <<_, hash::binary-size(31)>> = SszEx.hash(address_change.from_bls_pubkey) if prefix == Constants.bls_withdrawal_prefix() and address == hash do {:ok} diff --git a/lib/lambda_ethereum_consensus/state_transition/predicates.ex b/lib/lambda_ethereum_consensus/state_transition/predicates.ex index 3a38ff0db..73ffe288e 100644 --- a/lib/lambda_ethereum_consensus/state_transition/predicates.ex +++ b/lib/lambda_ethereum_consensus/state_transition/predicates.ex @@ -3,6 +3,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Predicates do Range of predicates enabling verification of state """ + alias LambdaEthereumConsensus.SszEx alias LambdaEthereumConsensus.StateTransition.{Accessors, Misc} alias SszTypes.BeaconState alias SszTypes.Validator @@ -131,9 +132,9 @@ defmodule LambdaEthereumConsensus.StateTransition.Predicates do defp hash_merkle_node(value_1, value_2, index, i) do if rem(div(index, 2 ** i), 2) == 1 do - :crypto.hash(:sha256, value_1 <> value_2) + SszEx.hash(value_1 <> value_2) else - :crypto.hash(:sha256, value_2 <> value_1) + SszEx.hash(value_2 <> value_1) end end diff --git a/lib/ssz_ex.ex b/lib/ssz_ex.ex index 85acb55eb..e46db263c 100644 --- a/lib/ssz_ex.ex +++ b/lib/ssz_ex.ex @@ -2,9 +2,14 @@ defmodule LambdaEthereumConsensus.SszEx do @moduledoc """ SSZ library in Elixir """ + ################# ### Public API ################# + + @spec hash(iodata()) :: binary() + def hash(data), do: :crypto.hash(:sha256, data) + def encode(value, {:int, size}), do: encode_int(value, size) def encode(value, :bool), do: encode_bool(value) diff --git a/lib/utils/bit_vector.ex b/lib/utils/bit_vector.ex index 3af1e1d44..5225e1f27 100644 --- a/lib/utils/bit_vector.ex +++ b/lib/utils/bit_vector.ex @@ -92,4 +92,10 @@ defmodule LambdaEthereumConsensus.Utils.BitVector do <> = bit_vector <<0::size(steps), remaining::bitstring>> end + + @doc """ + Returns the amount of bits set. + """ + @spec count(t) :: non_neg_integer() + def count(bit_vector), do: for(<>, do: bit) |> Enum.sum() end From 9e6b9b413e04dbbf75db5428e072bce10351a6ee Mon Sep 17 00:00:00 2001 From: Martin Paulucci Date: Thu, 30 Nov 2023 18:07:47 +0100 Subject: [PATCH 04/14] test: add finality, sync tests and enable several sanity cases. (#487) --- lib/spec/runners/finality.ex | 26 ++++ lib/spec/runners/helpers/process_blocks.ex | 57 +++++++ lib/spec/runners/sanity.ex | 169 ++++++++------------- lib/spec/runners/sync.ex | 22 +++ 4 files changed, 168 insertions(+), 106 deletions(-) create mode 100644 lib/spec/runners/finality.ex create mode 100644 lib/spec/runners/helpers/process_blocks.ex create mode 100644 lib/spec/runners/sync.ex diff --git a/lib/spec/runners/finality.ex b/lib/spec/runners/finality.ex new file mode 100644 index 000000000..6dda034d0 --- /dev/null +++ b/lib/spec/runners/finality.ex @@ -0,0 +1,26 @@ +defmodule FinalityTestRunner do + @moduledoc """ + Runner for finality test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/finality + """ + + use ExUnit.CaseTemplate + use TestRunner + + @disabled_cases [ + # "finality_no_updates_at_genesis", + # "finality_rule_1", + # "finality_rule_2", + # "finality_rule_3", + # "finality_rule_4" + ] + + @impl TestRunner + def skip?(%SpecTestCase{fork: "capella", case: testcase}) do + Enum.member?(@disabled_cases, testcase) + end + + @impl TestRunner + def run_test_case(testcase) do + Helpers.ProcessBlocks.process_blocks(testcase) + end +end diff --git a/lib/spec/runners/helpers/process_blocks.ex b/lib/spec/runners/helpers/process_blocks.ex new file mode 100644 index 000000000..ad43e38f9 --- /dev/null +++ b/lib/spec/runners/helpers/process_blocks.ex @@ -0,0 +1,57 @@ +defmodule Helpers.ProcessBlocks do + @moduledoc """ + Helper module for processing blocks. + """ + + use ExUnit.CaseTemplate + + alias LambdaEthereumConsensus.StateTransition + alias LambdaEthereumConsensus.Utils.Diff + + def process_blocks(%SpecTestCase{} = testcase) do + case_dir = SpecTestCase.dir(testcase) + + pre = + SpecTestUtils.read_ssz_from_file!( + case_dir <> "/pre.ssz_snappy", + SszTypes.BeaconState + ) + + post = + SpecTestUtils.read_ssz_from_optional_file!( + case_dir <> "/post.ssz_snappy", + SszTypes.BeaconState + ) + + meta = + YamlElixir.read_from_file!(case_dir <> "/meta.yaml") |> SpecTestUtils.sanitize_yaml() + + %{blocks_count: blocks_count} = meta + + blocks = + 0..(blocks_count - 1)//1 + |> Enum.map(fn index -> + SpecTestUtils.read_ssz_from_file!( + case_dir <> "/blocks_#{index}.ssz_snappy", + SszTypes.SignedBeaconBlock + ) + end) + + result = + blocks + |> Enum.reduce_while({:ok, pre}, fn block, {:ok, state} -> + case StateTransition.state_transition(state, block, true) do + {:ok, post_state} -> {:cont, {:ok, post_state}} + {:error, error} -> {:halt, {:error, error}} + end + end) + + case result do + {:ok, state} -> + assert Diff.diff(state, post) == :unchanged + + {:error, error} -> + assert post == nil, "Process block failed, error: #{error}" + end + end +end diff --git a/lib/spec/runners/sanity.ex b/lib/spec/runners/sanity.ex index 0094816b9..6535858a2 100644 --- a/lib/spec/runners/sanity.ex +++ b/lib/spec/runners/sanity.ex @@ -10,82 +10,83 @@ defmodule SanityTestRunner do alias LambdaEthereumConsensus.Utils.Diff @disabled_block_cases [ - "activate_and_partial_withdrawal_max_effective_balance", - "activate_and_partial_withdrawal_overdeposit", - "attestation", - "attester_slashing", - "balance_driven_status_transitions", - "bls_change", - "deposit_and_bls_change", - "deposit_in_block", - "deposit_top_up", - "duplicate_attestation_same_block", - "empty_block_transition", - "empty_block_transition_large_validator_set", - "empty_block_transition_no_tx", - "empty_block_transition_randomized_payload", - "empty_epoch_transition", - "empty_epoch_transition_large_validator_set", - "empty_epoch_transition_not_finalizing", - "eth1_data_votes_consensus", - "eth1_data_votes_no_consensus", - "exit_and_bls_change", - "full_random_operations_0", - "full_random_operations_1", - "full_random_operations_2", - "full_random_operations_3", - "full_withdrawal_in_epoch_transition", - "high_proposer_index", - "historical_batch", - "inactivity_scores_full_participation_leaking", - "inactivity_scores_leaking", + # "activate_and_partial_withdrawal_max_effective_balance", + # "activate_and_partial_withdrawal_overdeposit", + # "attestation", + # "attester_slashing", + # "balance_driven_status_transitions", + # "bls_change", + # "deposit_and_bls_change", + # "deposit_in_block", + # "deposit_top_up", + # "duplicate_attestation_same_block", + # "invalid_is_execution_enabled_false", + # "empty_block_transition", + # "empty_block_transition_large_validator_set", + # "empty_block_transition_no_tx", + # "empty_block_transition_randomized_payload", + # "empty_epoch_transition", + # "empty_epoch_transition_large_validator_set", + # "empty_epoch_transition_not_finalizing", + # "eth1_data_votes_consensus", + # "eth1_data_votes_no_consensus", + # "exit_and_bls_change", + # "full_random_operations_0", + # "full_random_operations_1", + # "full_random_operations_2", + # "full_random_operations_3", + # "full_withdrawal_in_epoch_transition", + # "high_proposer_index", + # "historical_batch", + # "inactivity_scores_full_participation_leaking", + # "inactivity_scores_leaking", "invalid_all_zeroed_sig", "invalid_duplicate_attester_slashing_same_block", "invalid_duplicate_bls_changes_same_block", - "invalid_duplicate_deposit_same_block", + # "invalid_duplicate_deposit_same_block", "invalid_duplicate_proposer_slashings_same_block", "invalid_duplicate_validator_exit_same_block", "invalid_incorrect_block_sig", - "invalid_incorrect_proposer_index_sig_from_expected_proposer", - "invalid_incorrect_proposer_index_sig_from_proposer_index", + # "invalid_incorrect_proposer_index_sig_from_expected_proposer", + # "invalid_incorrect_proposer_index_sig_from_proposer_index", "invalid_incorrect_state_root", - "invalid_only_increase_deposit_count", - "invalid_parent_from_same_slot", - "invalid_prev_slot_block_transition", - "invalid_same_slot_block_transition", + # "invalid_only_increase_deposit_count", + # "invalid_parent_from_same_slot", + # "invalid_prev_slot_block_transition", + # "invalid_same_slot_block_transition", "invalid_similar_proposer_slashings_same_block", "invalid_two_bls_changes_of_different_addresses_same_validator_same_block", - "invalid_withdrawal_fail_second_block_payload_isnt_compatible", - "is_execution_enabled_false", - "many_partial_withdrawals_in_epoch_transition", - "multiple_attester_slashings_no_overlap", - "multiple_attester_slashings_partial_overlap", - "multiple_different_proposer_slashings_same_block", - "multiple_different_validator_exits_same_block", - "partial_withdrawal_in_epoch_transition", - "proposer_after_inactive_index", - "proposer_self_slashing", - "proposer_slashing", - "skipped_slots", - "slash_and_exit_diff_index", - "slash_and_exit_same_index", - "sync_committee_committee__empty", - "sync_committee_committee__full", - "sync_committee_committee__half", - "sync_committee_committee_genesis__empty", - "sync_committee_committee_genesis__full", - "sync_committee_committee_genesis__half", - "top_up_and_partial_withdrawable_validator", - "top_up_to_fully_withdrawn_validator", - "voluntary_exit", - "withdrawal_success_two_blocks" + # "invalid_withdrawal_fail_second_block_payload_isnt_compatible", + # "is_execution_enabled_false", + # "many_partial_withdrawals_in_epoch_transition", + # "multiple_attester_slashings_no_overlap", + # "multiple_attester_slashings_partial_overlap", + # "multiple_different_proposer_slashings_same_block", + # "multiple_different_validator_exits_same_block", + # "partial_withdrawal_in_epoch_transition", + # "proposer_after_inactive_index", + # "proposer_self_slashing", + # "proposer_slashing", + # "skipped_slots", + # "slash_and_exit_diff_index", + "slash_and_exit_same_index" + # "sync_committee_committee__empty", + # "sync_committee_committee__full", + # "sync_committee_committee__half", + # "sync_committee_committee_genesis__empty", + # "sync_committee_committee_genesis__full", + # "sync_committee_committee_genesis__half", + # "top_up_and_partial_withdrawable_validator", + # "top_up_to_fully_withdrawn_validator", + # "voluntary_exit", + # "withdrawal_success_two_blocks" ] @disabled_slot_cases [ # "empty_epoch", # "slots_1", # "slots_2", - "over_epoch_boundary", + # "over_epoch_boundary", "historical_accumulator", "double_empty_epoch" ] @@ -135,50 +136,6 @@ defmodule SanityTestRunner do @impl TestRunner def run_test_case(%SpecTestCase{handler: "blocks"} = testcase) do # TODO process meta.yaml - case_dir = SpecTestCase.dir(testcase) - - pre = - SpecTestUtils.read_ssz_from_file!( - case_dir <> "/pre.ssz_snappy", - SszTypes.BeaconState - ) - - post = - SpecTestUtils.read_ssz_from_optional_file!( - case_dir <> "/post.ssz_snappy", - SszTypes.BeaconState - ) - - meta = - YamlElixir.read_from_file!(case_dir <> "/meta.yaml") |> SpecTestUtils.sanitize_yaml() - - %{blocks_count: blocks_count} = meta - - blocks = - 0..(blocks_count - 1)//1 - |> Enum.map(fn index -> - SpecTestUtils.read_ssz_from_file!( - case_dir <> "/blocks_#{index}.ssz_snappy", - SszTypes.SignedBeaconBlock - ) - end) - |> Enum.map(& &1.message) - - result = - blocks - |> Enum.reduce_while({:ok, pre}, fn block, {:ok, state} -> - case StateTransition.process_block(state, block) do - {:ok, post_state} -> {:cont, {:ok, post_state}} - {:error, error} -> {:halt, {:error, error}} - end - end) - - case result do - {:ok, state} -> - assert Diff.diff(state, post) == :unchanged - - {:error, error} -> - assert post == nil, "Process block failed, error: #{error}" - end + Helpers.ProcessBlocks.process_blocks(testcase) end end diff --git a/lib/spec/runners/sync.ex b/lib/spec/runners/sync.ex new file mode 100644 index 000000000..e884f7401 --- /dev/null +++ b/lib/spec/runners/sync.ex @@ -0,0 +1,22 @@ +defmodule SyncTestRunner do + @moduledoc """ + Runner for Operations test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/sync + """ + + use ExUnit.CaseTemplate + use TestRunner + + @disabled_cases [ + "from_syncing_to_invalid" + ] + + @impl TestRunner + def skip?(%SpecTestCase{} = testcase) do + Enum.member?(@disabled_cases, testcase.case) + end + + @impl TestRunner + def run_test_case(%SpecTestCase{} = testcase) do + ForkChoiceTestRunner.run_test_case(testcase) + end +end From 762f6f3958c385f71fb275d9c039b891fc7698cd Mon Sep 17 00:00:00 2001 From: Martin Paulucci Date: Thu, 30 Nov 2023 18:11:58 +0100 Subject: [PATCH 05/14] docs: remove outdated roadmap (#488) --- README.md | 60 ------------------------------------------------------- 1 file changed, 60 deletions(-) diff --git a/README.md b/README.md index c9cebae31..424a827b8 100644 --- a/README.md +++ b/README.md @@ -167,66 +167,6 @@ Our aim is to infuse these strengths into the Ethereum consensus client ecosyste We also have for objective to bootstart an Ethereum Elixir community, and to make Elixir a first-class citizen in the Ethereum ecosystem. -## Roadmap - -**1. Block Subscription - Mid September** - - Libp2p discovery and block retrieval - - SSZ + snappy - - `on_block` callback: Save the latest block in fork-choice store, conduct basic checks. GetHead returns the last obtained block. - - Beacon API: Return block root (`GET /beacon/states/{state_id}/root`) - -**2. Checkpoint Sync - October** - - Libp2p primitives for sync - - Support checkpoint Sync from a known provider - - Sync from the latest finalized block - - BeaconAPI: Return headers for head block - - EngineAPI: Validate incoming blocks - -**3. Attestations - Mid October** - - Libp2p attestation retrieval - - Basic beacon state representation - - Store attestations (last message sent by each validator) - - `on_attestation` callback for attestations sent via Gossip - - Process attestations from blocks - - Beacon API: Return head block root (`GET /beacon/states/head/root`) - -**4. Deposits - November** - - BLS signature checks - - Update consensus state view of deposit contract (`process_eth1_data`) - - Process deposit operation to update validator list (`process_deposit`) - - Verify block signatures (`verify_block_signature`) - -**5. Slots and Fork-choice - Mid November** - - `on_tick`/`process_slot` in state transition; a GenServer that calls this periodically - - `on_block`: Add slot-related checks and epoch calculations (excluding finalization) - - Get-head uses the messages - - Block header validation - - EngineAPI: Process execution payload - - BeaconAPI: Ensure getting head values point to the heaviest - -**6. Finality and Slashing - Mid November** - - Epoch processing - - `on_block`: Prune fork-choice store; reject blocks before finalization - - Add RANDAO mix to the beacon state - - BeaconAPI: Retrieve finality checkpoints, randao mixes - - Process attester slashings and proposer slashings - - EngineAPI: fork-choice updates - -**7. Rewards, Shuffling - December** - - Process rewards `on_epoch` for a checkpoint - - Handle Deposits and Withdrawals - - Implement RANDAO - - Calculate committee for a given state - - Conduct shuffling - - Integrate with Grafana - - BeaconAPI: Retrieve randao mix for a given block - -**8. Validator Features - Mid December/January 2024** - - Create attestations - - Monitor for slashings - - Create slashing proofs - - BeaconAPI: Post blocks, slashings, voluntary exits, and withdrawals - ## Contributor Package Dream of becoming an Ethereum core developer? Eager to shape the protocol that will underpin tomorrow's world? Want to collaborate with a passionate team, learn, grow, and be a pivotal part of the Ethereum Elixir community? From 8b7a60e12a3b806356c0a2f3048b7576650a2328 Mon Sep 17 00:00:00 2001 From: Martin Paulucci Date: Thu, 30 Nov 2023 18:40:46 +0100 Subject: [PATCH 06/14] test: implement random test runner. (#489) --- lib/spec/runners/random.ex | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 lib/spec/runners/random.ex diff --git a/lib/spec/runners/random.ex b/lib/spec/runners/random.ex new file mode 100644 index 000000000..7edcaa113 --- /dev/null +++ b/lib/spec/runners/random.ex @@ -0,0 +1,37 @@ +defmodule RandomTestRunner do + @moduledoc """ + Runner for random test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/random + """ + + use ExUnit.CaseTemplate + use TestRunner + + @disabled_cases [ + "randomized_0", + # "randomized_1", + "randomized_2", + "randomized_3", + # "randomized_4", + "randomized_5", + "randomized_6", + "randomized_7", + # "randomized_8", + # "randomized_9", + "randomized_10", + # "randomized_11", + "randomized_12", + "randomized_13", + "randomized_14", + "randomized_15" + ] + + @impl TestRunner + def skip?(%SpecTestCase{fork: "capella", case: testcase}) do + Enum.member?(@disabled_cases, testcase) + end + + @impl TestRunner + def run_test_case(testcase) do + Helpers.ProcessBlocks.process_blocks(testcase) + end +end From 3f384cc4553466d80f75461532426cae7e658f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 1 Dec 2023 12:04:18 -0300 Subject: [PATCH 07/14] fix: `get_head` not propagating `blocks` updates (#493) --- .../fork_choice/helpers.ex | 11 ++++++----- .../state_transition/state_transition.ex | 5 +---- lib/spec/runners/fork_choice.ex | 18 +++++++++--------- 3 files changed, 16 insertions(+), 18 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/helpers.ex b/lib/lambda_ethereum_consensus/fork_choice/helpers.ex index 3a4baaa6b..ce38e83ee 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/helpers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/helpers.ex @@ -123,14 +123,15 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do # If any children branches contain expected finalized/justified checkpoints, # add to filtered block-tree and signal viability to parent. - filter_block_tree_result = Enum.map(children, &filter_block_tree(store, &1, blocks)) + {filter_block_tree_result, new_blocks} = + Enum.map_reduce(children, blocks, fn root, acc -> filter_block_tree(store, root, acc) end) cond do - Enum.any?(filter_block_tree_result, fn {b, _} -> b end) -> - {true, Map.put(blocks, block_root, block)} + Enum.any?(filter_block_tree_result) -> + {true, Map.put(new_blocks, block_root, block)} - Enum.any?(children) -> - {false, blocks} + not Enum.empty?(children) -> + {false, new_blocks} true -> filter_leaf_block(store, block_root, block, blocks) diff --git a/lib/lambda_ethereum_consensus/state_transition/state_transition.ex b/lib/lambda_ethereum_consensus/state_transition/state_transition.ex index cd190b952..0a0778d5b 100644 --- a/lib/lambda_ethereum_consensus/state_transition/state_transition.ex +++ b/lib/lambda_ethereum_consensus/state_transition/state_transition.ex @@ -15,11 +15,8 @@ defmodule LambdaEthereumConsensus.StateTransition do def state_transition( %BeaconState{} = state, %SignedBeaconBlock{message: block} = signed_block, - _validate_result + validate_result ) do - # NOTE: we aren't in a state to make validations yet - validate_result = false - state # Process slots (including those with no blocks) since block |> process_slots(block.slot) diff --git a/lib/spec/runners/fork_choice.ex b/lib/spec/runners/fork_choice.ex index 49e4b9aa9..0e6044987 100644 --- a/lib/spec/runners/fork_choice.ex +++ b/lib/spec/runners/fork_choice.ex @@ -11,7 +11,7 @@ defmodule ForkChoiceTestRunner do alias SszTypes.Store @disabled_on_block_cases [ - "basic", + # "basic", "incompatible_justification_update_end_of_epoch", "incompatible_justification_update_start_of_epoch", "justification_update_beginning_of_epoch", @@ -23,28 +23,28 @@ defmodule ForkChoiceTestRunner do "justified_update_not_realized_finality", "new_finalized_slot_is_justified_checkpoint_ancestor", "not_pull_up_current_epoch_block", - "on_block_bad_parent_root", + # "on_block_bad_parent_root", "on_block_before_finalized", "on_block_checkpoints", "on_block_finalized_skip_slots", "on_block_finalized_skip_slots_not_in_skip_chain", - "on_block_future_block", - "proposer_boost", - "proposer_boost_root_same_slot_untimely_block", + # "on_block_future_block", + # "proposer_boost", + # "proposer_boost_root_same_slot_untimely_block", "pull_up_on_tick", "pull_up_past_epoch_block" ] @disabled_ex_ante_cases [ - "ex_ante_attestations_is_greater_than_proposer_boost_with_boost", + # "ex_ante_attestations_is_greater_than_proposer_boost_with_boost", "ex_ante_sandwich_with_boost_not_sufficient", "ex_ante_sandwich_with_honest_attestation", - "ex_ante_sandwich_without_attestations", - "ex_ante_vanilla" + "ex_ante_sandwich_without_attestations" + # "ex_ante_vanilla" ] @disabled_get_head_cases [ - "chain_no_attestations", + # "chain_no_attestations", "discard_equivocations_on_attester_slashing", "discard_equivocations_slashed_validator_censoring", "filtered_block_tree", From 035e28cc697f996502a402013433151a1b488634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 1 Dec 2023 13:33:10 -0300 Subject: [PATCH 08/14] perf: optimize `process_operations` (#491) --- .../fork_choice/handlers.ex | 4 +- .../state_transition/accessors.ex | 95 ++++----- .../state_transition/operations.ex | 192 +++++++----------- .../state_transition/predicates.ex | 32 ++- .../state_transition/state_transition.ex | 52 ++--- lib/lambda_ethereum_consensus/utils.ex | 14 +- lib/utils/bit_vector.ex | 9 + 7 files changed, 184 insertions(+), 214 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 3bef8b753..5d06630f5 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -16,7 +16,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do Store } - import LambdaEthereumConsensus.Utils, only: [if_then_update: 3, map: 2] + import LambdaEthereumConsensus.Utils, only: [if_then_update: 3, map_ok: 2] ### Public API ### @@ -323,7 +323,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do else: {:ok, &1} ) ) - |> map( + |> map_ok( &{:ok, %Store{store | checkpoint_states: Map.put(store.checkpoint_states, target, &1)}} ) end diff --git a/lib/lambda_ethereum_consensus/state_transition/accessors.ex b/lib/lambda_ethereum_consensus/state_transition/accessors.ex index b2f2d04f5..376e28a54 100644 --- a/lib/lambda_ethereum_consensus/state_transition/accessors.ex +++ b/lib/lambda_ethereum_consensus/state_transition/accessors.ex @@ -5,6 +5,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do alias LambdaEthereumConsensus.SszEx alias LambdaEthereumConsensus.StateTransition.{Math, Misc, Predicates} + alias LambdaEthereumConsensus.Utils alias SszTypes.{Attestation, BeaconState, IndexedAttestation, SyncCommittee, Validator} @doc """ @@ -250,16 +251,12 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do """ @spec get_committee_count_per_slot(BeaconState.t(), SszTypes.epoch()) :: SszTypes.uint64() def get_committee_count_per_slot(%BeaconState{} = state, epoch) do - active_validators_count = length(get_active_validator_indices(state, epoch)) - - committee_size = - active_validators_count - |> Kernel.div(ChainSpec.get("SLOTS_PER_EPOCH")) - |> Kernel.div(ChainSpec.get("TARGET_COMMITTEE_SIZE")) - - [ChainSpec.get("MAX_COMMITTEES_PER_SLOT"), committee_size] - |> Enum.min() - |> (&max(1, &1)).() + get_active_validator_indices(state, epoch) + |> length() + |> div(ChainSpec.get("SLOTS_PER_EPOCH")) + |> div(ChainSpec.get("TARGET_COMMITTEE_SIZE")) + |> min(ChainSpec.get("MAX_COMMITTEES_PER_SLOT")) + |> max(1) end @doc """ @@ -318,6 +315,23 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do ) :: {:ok, list(SszTypes.uint64())} | {:error, binary()} def get_attestation_participation_flag_indices(state, data, inclusion_delay) do + with :ok <- check_valid_source(state, data), + {:ok, target_root} <- + get_block_root(state, data.target.epoch) |> Utils.map_err("invalid target"), + {:ok, head_root} <- + get_block_root_at_slot(state, data.slot) |> Utils.map_err("invalid head") do + is_matching_target = data.target.root == target_root + is_matching_head = is_matching_target and data.beacon_block_root == head_root + + source_indices = compute_source_indices(inclusion_delay) + target_indices = compute_target_indices(is_matching_target, inclusion_delay) + head_indices = compute_head_indices(is_matching_head, inclusion_delay) + + {:ok, Enum.concat([source_indices, target_indices, head_indices])} + end + end + + defp check_valid_source(state, data) do justified_checkpoint = if data.target.epoch == get_current_epoch(state) do state.current_justified_checkpoint @@ -325,55 +339,32 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do state.previous_justified_checkpoint end - is_matching_source = data.source == justified_checkpoint - - case {get_block_root(state, data.target.epoch), get_block_root_at_slot(state, data.slot)} do - {{:ok, block_root}, {:ok, block_root_at_slot}} -> - if is_matching_source do - is_matching_target = is_matching_source && data.target.root == block_root - source_indices = compute_source_indices(data, justified_checkpoint, inclusion_delay) - - target_indices = - compute_target_indices(data, block_root, inclusion_delay, is_matching_source) - - head_indices = - compute_head_indices(data, block_root_at_slot, inclusion_delay, is_matching_target) - - {:ok, source_indices ++ target_indices ++ head_indices} - else - {:error, "Attestation source does not match justified checkpoint"} - end - - _ -> - {:error, "Failed to get block roots"} + if data.source == justified_checkpoint do + :ok + else + {:error, "invalid source"} end end - defp compute_source_indices(data, justified_checkpoint, inclusion_delay) do - if data.source == justified_checkpoint && - inclusion_delay <= Math.integer_squareroot(ChainSpec.get("SLOTS_PER_EPOCH")) do - [Constants.timely_source_flag_index()] - else - [] - end + defp compute_source_indices(inclusion_delay) do + max_delay = ChainSpec.get("SLOTS_PER_EPOCH") |> Math.integer_squareroot() + if inclusion_delay <= max_delay, do: [Constants.timely_source_flag_index()], else: [] end - defp compute_target_indices(data, block_root, inclusion_delay, is_matching_source) do - if is_matching_source && data.target.root == block_root && - inclusion_delay <= ChainSpec.get("SLOTS_PER_EPOCH") do - [Constants.timely_target_flag_index()] - else - [] - end + defp compute_target_indices(is_matching_target, inclusion_delay) do + max_delay = ChainSpec.get("SLOTS_PER_EPOCH") + + if is_matching_target and inclusion_delay <= max_delay, + do: [Constants.timely_target_flag_index()], + else: [] end - defp compute_head_indices(data, block_root_at_slot, inclusion_delay, is_matching_target) do - if is_matching_target && data.beacon_block_root == block_root_at_slot && - inclusion_delay == ChainSpec.get("MIN_ATTESTATION_INCLUSION_DELAY") do - [Constants.timely_head_flag_index()] - else - [] - end + defp compute_head_indices(is_matching_head, inclusion_delay) do + min_inclusion_delay = ChainSpec.get("MIN_ATTESTATION_INCLUSION_DELAY") + + if is_matching_head and inclusion_delay == min_inclusion_delay, + do: [Constants.timely_head_flag_index()], + else: [] end @doc """ diff --git a/lib/lambda_ethereum_consensus/state_transition/operations.ex b/lib/lambda_ethereum_consensus/state_transition/operations.ex index 3d0a0766d..b5bf22ba5 100644 --- a/lib/lambda_ethereum_consensus/state_transition/operations.ex +++ b/lib/lambda_ethereum_consensus/state_transition/operations.ex @@ -645,18 +645,10 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do @spec process_attestation(BeaconState.t(), Attestation.t()) :: {:ok, BeaconState.t()} | {:error, binary()} def process_attestation(state, attestation) do - case verify_attestation_for_process(state, attestation) do - {:ok, _} -> - data = attestation.data - aggregation_bits = attestation.aggregation_bits - - case process_attestation(state, data, aggregation_bits) do - {:ok, updated_state} -> {:ok, updated_state} - {:error, reason} -> {:error, reason} - end - - {:error, reason} -> - {:error, reason} + # TODO: optimize (takes ~3s) + with :ok <- verify_attestation_for_process(state, attestation) do + # TODO: optimize (takes ~1s) + process_attestation(state, attestation.data, attestation.aggregation_bits) end end @@ -672,7 +664,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do is_current_epoch = data.target.epoch == Accessors.get_current_epoch(state) initial_epoch_participation = get_initial_epoch_participation(state, is_current_epoch) - {proposer_reward_numerator, updated_epoch_participation} = + {updated_epoch_participation, proposer_reward_numerator} = update_epoch_participation( state, attesting_indices, @@ -684,17 +676,10 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do {:ok, proposer_index} = Accessors.get_beacon_proposer_index(state) - bal_updated_state = - Mutators.increase_balance( - state, - proposer_index, - proposer_reward - ) - - updated_state = - update_state(bal_updated_state, is_current_epoch, updated_epoch_participation) - - {:ok, updated_state} + state + |> Mutators.increase_balance(proposer_index, proposer_reward) + |> update_state(is_current_epoch, updated_epoch_participation) + |> then(&{:ok, &1}) else {:error, reason} -> {:error, reason} end @@ -709,31 +694,35 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do initial_epoch_participation, participation_flag_indices ) do - Enum.reduce(attesting_indices, {0, initial_epoch_participation}, fn index, {acc, ep} -> - update_participation_for_index(state, index, acc, ep, participation_flag_indices) + weights = + Constants.participation_flag_weights() + |> Stream.with_index() + |> Enum.filter(&(elem(&1, 1) in participation_flag_indices)) + + base_reward_per_increment = Accessors.get_base_reward_per_increment(state) + + state.validators + |> Stream.zip(initial_epoch_participation) + |> Stream.with_index() + |> Enum.map_reduce(0, fn {{validator, participation}, i}, acc -> + if MapSet.member?(attesting_indices, i) do + base_reward = Accessors.get_base_reward(validator, base_reward_per_increment) + update_participation(participation, acc, base_reward, weights) + else + {participation, acc} + end end) end - defp update_participation_for_index(state, index, acc, ep, participation_flag_indices) do - Enum.reduce_while( - 0..(length(Constants.participation_flag_weights()) - 1), - {acc, ep}, - fn flag_index, {inner_acc, inner_ep} -> - if flag_index in participation_flag_indices && - not Predicates.has_flag(Enum.at(inner_ep, index), flag_index) do - updated_ep = - List.replace_at(inner_ep, index, Misc.add_flag(Enum.at(inner_ep, index), flag_index)) - - acc_delta = - Accessors.get_base_reward(state, index) * - Enum.at(Constants.participation_flag_weights(), flag_index) - - {:cont, {inner_acc + acc_delta, updated_ep}} - else - {:cont, {inner_acc, inner_ep}} - end - end - ) + defp update_participation(participation, acc, base_reward, weights) do + bv_participation = BitVector.new(participation, 8) + + weights + |> Stream.reject(&BitVector.set?(bv_participation, elem(&1, 1))) + |> Enum.reduce({bv_participation, acc}, fn {weight, index}, {bv_participation, acc} -> + {bv_participation |> BitVector.set(index), acc + base_reward * weight} + end) + |> then(fn {p, acc} -> {BitVector.to_integer(p), acc} end) end defp compute_proposer_reward(proposer_reward_numerator) do @@ -751,16 +740,31 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do defp update_state(state, false, updated_epoch_participation), do: %{state | previous_epoch_participation: updated_epoch_participation} - def verify_attestation_for_process(state, attestation) do - data = attestation.data + def verify_attestation_for_process(state, %Attestation{data: data} = attestation) do + with {:ok, beacon_committee} <- Accessors.get_beacon_committee(state, data.slot, data.index), + {:ok, indexed_attestation} <- Accessors.get_indexed_attestation(state, attestation) do + cond do + invalid_target_epoch?(data, state) -> + {:error, "Invalid target epoch"} - beacon_committee = fetch_beacon_committee(state, data) - indexed_attestation = fetch_indexed_attestation(state, attestation) + epoch_mismatch?(data) -> + {:error, "Epoch mismatch"} - if has_invalid_conditions?(data, state, beacon_committee, indexed_attestation, attestation) do - {:error, get_error_message(data, state, beacon_committee, indexed_attestation, attestation)} - else - {:ok, "Valid"} + invalid_slot_range?(data, state) -> + {:error, "Invalid slot range"} + + exceeds_committee_count?(data, state) -> + {:error, "Index exceeds committee count"} + + mismatched_aggregation_bits_length?(attestation, beacon_committee) -> + {:error, "Mismatched aggregation bits length"} + + not valid_signature?(state, indexed_attestation) -> + {:error, "Invalid signature"} + + true -> + :ok + end end end @@ -831,55 +835,6 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do end end - defp has_invalid_conditions?(data, state, beacon_committee, indexed_attestation, attestation) do - invalid_target_epoch?(data, state) || - epoch_mismatch?(data) || - invalid_slot_range?(data, state) || - exceeds_committee_count?(data, state) || - !beacon_committee || !indexed_attestation || - mismatched_aggregation_bits_length?(attestation, beacon_committee) || - invalid_signature?(state, indexed_attestation) - end - - defp get_error_message(data, state, beacon_committee, indexed_attestation, attestation) do - cond do - invalid_target_epoch?(data, state) -> - "Invalid target epoch" - - epoch_mismatch?(data) -> - "Epoch mismatch" - - invalid_slot_range?(data, state) -> - "Invalid slot range" - - exceeds_committee_count?(data, state) -> - "Index exceeds committee count" - - !beacon_committee || !indexed_attestation -> - "Indexing error at beacon committee" - - mismatched_aggregation_bits_length?(attestation, beacon_committee) -> - "Mismatched aggregation bits length" - - invalid_signature?(state, indexed_attestation) -> - "Invalid signature" - end - end - - defp fetch_beacon_committee(state, data) do - case Accessors.get_beacon_committee(state, data.slot, data.index) do - {:ok, committee} -> committee - {:error, _reason} -> nil - end - end - - defp fetch_indexed_attestation(state, attestation) do - case Accessors.get_indexed_attestation(state, attestation) do - {:ok, indexed_attestation} -> indexed_attestation - {:error, _reason} -> nil - end - end - defp invalid_target_epoch?(data, state) do data.target.epoch < Accessors.get_previous_epoch(state) || data.target.epoch > Accessors.get_current_epoch(state) @@ -902,8 +857,8 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do length_of_bitstring(attestation.aggregation_bits) - 1 != length(beacon_committee) end - defp invalid_signature?(state, indexed_attestation) do - not Predicates.is_valid_indexed_attestation(state, indexed_attestation) + defp valid_signature?(state, indexed_attestation) do + Predicates.is_valid_indexed_attestation(state, indexed_attestation) end defp length_of_bitstring(binary) when is_binary(binary) do @@ -996,25 +951,20 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do def process_operations(state, body) do # Ensure that outstanding deposits are processed up to the maximum number of deposits with :ok <- verify_deposits(state, body) do - # Define a function that iterates over a list of operations and applies a given function to each element - updated_state = - state - |> for_ops(body.proposer_slashings, &process_proposer_slashing/2) - |> for_ops(body.attester_slashings, &process_attester_slashing/2) - |> for_ops(body.attestations, &process_attestation/2) - |> for_ops(body.deposits, &process_deposit/2) - |> for_ops(body.voluntary_exits, &process_voluntary_exit/2) - |> for_ops(body.bls_to_execution_changes, &process_bls_to_execution_change/2) - - {:ok, updated_state} + {:ok, state} + |> for_ops(body.proposer_slashings, &process_proposer_slashing/2) + |> for_ops(body.attester_slashings, &process_attester_slashing/2) + |> for_ops(body.attestations, &process_attestation/2) + |> for_ops(body.deposits, &process_deposit/2) + |> for_ops(body.voluntary_exits, &process_voluntary_exit/2) + |> for_ops(body.bls_to_execution_changes, &process_bls_to_execution_change/2) end end - defp for_ops(state, operations, func) do - Enum.reduce(operations, state, fn operation, acc -> - with {:ok, state} <- func.(acc, operation) do - state - end + defp for_ops(acc, operations, func) do + Enum.reduce_while(operations, acc, fn + operation, {:ok, state} -> {:cont, func.(state, operation)} + _, {:error, reason} -> {:halt, {:error, reason}} end) end diff --git a/lib/lambda_ethereum_consensus/state_transition/predicates.ex b/lib/lambda_ethereum_consensus/state_transition/predicates.ex index 73ffe288e..189f01277 100644 --- a/lib/lambda_ethereum_consensus/state_transition/predicates.ex +++ b/lib/lambda_ethereum_consensus/state_transition/predicates.ex @@ -145,7 +145,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Predicates do def is_valid_indexed_attestation(state, indexed_attestation) do indices = indexed_attestation.attesting_indices - if Enum.empty?(indices) or not (indices == indices |> Enum.uniq() |> Enum.sort()) do + if Enum.empty?(indices) or not uniq_and_sorted?(indices) do false else domain_type = Constants.domain_beacon_attester() @@ -155,17 +155,27 @@ defmodule LambdaEthereumConsensus.StateTransition.Predicates do Accessors.get_domain(state, domain_type, epoch) |> then(&Misc.compute_signing_root(indexed_attestation.data, &1)) - res = - state.validators - |> Stream.with_index() - |> Stream.filter(fn {_, i} -> Enum.member?(indices, i) end) - |> Enum.map(fn {%{pubkey: p}, _} -> p end) - |> Bls.fast_aggregate_verify(signing_root, indexed_attestation.signature) - - case res do - {:ok, r} -> r + state.validators + |> Stream.with_index() + |> Enum.flat_map_reduce( + indices, + fn + {%Validator{pubkey: p}, i}, [i | acc] -> {[p], acc} + _, acc -> {[], acc} + end + ) + |> then(fn + {pks, []} -> pks |> Bls.fast_aggregate_verify(signing_root, indexed_attestation.signature) + {_, _} -> {:error, "invalid indices"} + end) + |> then(fn + {:ok, b} -> b {:error, _} -> false - end + end) end end + + defp uniq_and_sorted?([]), do: true + defp uniq_and_sorted?([a, b | _]) when a >= b, do: false + defp uniq_and_sorted?([_ | tail]), do: uniq_and_sorted?(tail) end diff --git a/lib/lambda_ethereum_consensus/state_transition/state_transition.ex b/lib/lambda_ethereum_consensus/state_transition/state_transition.ex index 0a0778d5b..fcce6062d 100644 --- a/lib/lambda_ethereum_consensus/state_transition/state_transition.ex +++ b/lib/lambda_ethereum_consensus/state_transition/state_transition.ex @@ -8,7 +8,7 @@ defmodule LambdaEthereumConsensus.StateTransition do alias LambdaEthereumConsensus.StateTransition.{EpochProcessing, Operations} alias SszTypes.{BeaconBlockHeader, BeaconState, SignedBeaconBlock} - import LambdaEthereumConsensus.Utils, only: [map: 2] + import LambdaEthereumConsensus.Utils, only: [map_ok: 2] @spec state_transition(BeaconState.t(), SignedBeaconBlock.t(), boolean()) :: {:ok, BeaconState.t()} | {:error, String.t()} @@ -21,7 +21,7 @@ defmodule LambdaEthereumConsensus.StateTransition do # Process slots (including those with no blocks) since block |> process_slots(block.slot) # Verify signature - |> map(fn st -> + |> map_ok(fn st -> if not validate_result or verify_block_signature(st, signed_block) do {:ok, st} else @@ -29,9 +29,9 @@ defmodule LambdaEthereumConsensus.StateTransition do end end) # Process block - |> map(&process_block(&1, block)) + |> map_ok(&process_block(&1, block)) # Verify state root - |> map(fn st -> + |> map_ok(fn st -> if not validate_result or block.state_root == Ssz.hash_tree_root!(st) do {:ok, st} else @@ -48,10 +48,10 @@ defmodule LambdaEthereumConsensus.StateTransition do Enum.reduce((old_slot + 1)..slot//1, {:ok, state}, fn next_slot, acc -> acc - |> map(&process_slot/1) + |> map_ok(&process_slot/1) # Process epoch on the start slot of the next epoch - |> map(&maybe_process_epoch(&1, rem(next_slot, slots_per_epoch))) - |> map(&{:ok, %BeaconState{&1 | slot: next_slot}}) + |> map_ok(&maybe_process_epoch(&1, rem(next_slot, slots_per_epoch))) + |> map_ok(&{:ok, %BeaconState{&1 | slot: next_slot}}) end) end @@ -88,17 +88,17 @@ defmodule LambdaEthereumConsensus.StateTransition do defp process_epoch(%BeaconState{} = state) do state |> EpochProcessing.process_justification_and_finalization() - |> map(&EpochProcessing.process_inactivity_updates/1) - |> map(&EpochProcessing.process_rewards_and_penalties/1) - |> map(&EpochProcessing.process_registry_updates/1) - |> map(&EpochProcessing.process_slashings/1) - |> map(&EpochProcessing.process_eth1_data_reset/1) - |> map(&EpochProcessing.process_effective_balance_updates/1) - |> map(&EpochProcessing.process_slashings_reset/1) - |> map(&EpochProcessing.process_randao_mixes_reset/1) - |> map(&EpochProcessing.process_historical_summaries_update/1) - |> map(&EpochProcessing.process_participation_flag_updates/1) - |> map(&EpochProcessing.process_sync_committee_updates/1) + |> map_ok(&EpochProcessing.process_inactivity_updates/1) + |> map_ok(&EpochProcessing.process_rewards_and_penalties/1) + |> map_ok(&EpochProcessing.process_registry_updates/1) + |> map_ok(&EpochProcessing.process_slashings/1) + |> map_ok(&EpochProcessing.process_eth1_data_reset/1) + |> map_ok(&EpochProcessing.process_effective_balance_updates/1) + |> map_ok(&EpochProcessing.process_slashings_reset/1) + |> map_ok(&EpochProcessing.process_randao_mixes_reset/1) + |> map_ok(&EpochProcessing.process_historical_summaries_update/1) + |> map_ok(&EpochProcessing.process_participation_flag_updates/1) + |> map_ok(&EpochProcessing.process_sync_committee_updates/1) end def verify_block_signature(%BeaconState{} = state, %SignedBeaconBlock{} = signed_block) do @@ -118,12 +118,14 @@ defmodule LambdaEthereumConsensus.StateTransition do verify_and_notify_new_payload = &Execution.verify_and_notify_new_payload/1 {:ok, state} - |> map(&Operations.process_block_header(&1, block)) - |> map(&Operations.process_withdrawals(&1, block.body.execution_payload)) - |> map(&Operations.process_execution_payload(&1, block.body, verify_and_notify_new_payload)) - |> map(&Operations.process_randao(&1, block.body)) - |> map(&Operations.process_eth1_data(&1, block.body)) - |> map(&Operations.process_operations(&1, block.body)) - |> map(&Operations.process_sync_aggregate(&1, block.body.sync_aggregate)) + |> map_ok(&Operations.process_block_header(&1, block)) + |> map_ok(&Operations.process_withdrawals(&1, block.body.execution_payload)) + |> map_ok( + &Operations.process_execution_payload(&1, block.body, verify_and_notify_new_payload) + ) + |> map_ok(&Operations.process_randao(&1, block.body)) + |> map_ok(&Operations.process_eth1_data(&1, block.body)) + |> map_ok(&Operations.process_operations(&1, block.body)) + |> map_ok(&Operations.process_sync_aggregate(&1, block.body.sync_aggregate)) end end diff --git a/lib/lambda_ethereum_consensus/utils.ex b/lib/lambda_ethereum_consensus/utils.ex index ce525e277..f81c049d4 100644 --- a/lib/lambda_ethereum_consensus/utils.ex +++ b/lib/lambda_ethereum_consensus/utils.ex @@ -27,7 +27,15 @@ defmodule LambdaEthereumConsensus.Utils do If first arg is an ``{:ok, value}`` tuple, apply ``fun`` to ``value`` and return the result. Else, if it's an ``{:error, _}`` tuple, returns it. """ - @spec map({:ok | :error, any()}, (any() -> any())) :: any() | {:error, any()} - def map({:ok, value}, fun), do: fun.(value) - def map({:error, _} = err, _fun), do: err + @spec map_ok({:ok | :error, any()}, (any() -> any())) :: any() | {:error, any()} + def map_ok({:ok, value}, fun), do: fun.(value) + def map_ok({:error, _} = err, _fun), do: err + + @doc """ + If first arg is an ``{:error, reason}`` tuple, replace ``reason`` with + ``new_reason``. Else, return the first arg unmodified. + """ + @spec map_err(any() | {:error, String.t()}, String.t()) :: any() | {:error, String.t()} + def map_err({:error, _}, reason), do: {:error, reason} + def map_err(v, _), do: v end diff --git a/lib/utils/bit_vector.ex b/lib/utils/bit_vector.ex index 5225e1f27..4d552bea1 100644 --- a/lib/utils/bit_vector.ex +++ b/lib/utils/bit_vector.ex @@ -21,6 +21,15 @@ defmodule LambdaEthereumConsensus.Utils.BitVector do b end + @doc """ + Turns the bit_vector into an integer. + """ + @spec to_integer(t) :: non_neg_integer() + def to_integer(bit_vector) do + <> = bit_vector + int + end + @doc """ True if a single bit is set to 1. Equivalent to bit_vector[index] == 1. From 43c0ca6fffa3641f6286b5fa53a8f6060e8046fd Mon Sep 17 00:00:00 2001 From: Godspower Eze <61994334+Godspower-Eze@users.noreply.github.com> Date: Tue, 5 Dec 2023 13:31:45 +0100 Subject: [PATCH 09/14] feat: use libp2p raw tracer functionality (#457) --- lib/lambda_ethereum_consensus/telemetry.ex | 9 + lib/libp2p_port.ex | 73 +- .../grafana/provisioning/dashboards/home.json | 820 +++++++++++++++++- .../internal/proto_helpers/proto_helpers.go | 66 ++ .../internal/subscriptions/subscriptions.go | 77 ++ proto/libp2p.proto | 58 ++ 6 files changed, 1093 insertions(+), 10 deletions(-) diff --git a/lib/lambda_ethereum_consensus/telemetry.ex b/lib/lambda_ethereum_consensus/telemetry.ex index 26c89d79f..79920d62e 100644 --- a/lib/lambda_ethereum_consensus/telemetry.ex +++ b/lib/lambda_ethereum_consensus/telemetry.ex @@ -59,6 +59,15 @@ defmodule LambdaEthereumConsensus.Telemetry do counter("peers.connection.count", tags: [:result]), counter("peers.challenge.count", tags: [:result]), counter("network.request.count", tags: [:result, :type, :reason]), + counter("network.pubsub_peers.count", tags: [:result]), + sum("network.pubsub_topic_active.active", tags: [:topic]), + counter("network.pubsub_topics_graft.count", tags: [:topic]), + counter("network.pubsub_topics_prune.count", tags: [:topic]), + counter("network.pubsub_topics_deliver_message.count", tags: [:topic]), + counter("network.pubsub_topics_duplicate_message.count", tags: [:topic]), + counter("network.pubsub_topics_reject_message.count", tags: [:topic]), + counter("network.pubsub_topics_un_deliverable_message.count", tags: [:topic]), + counter("network.pubsub_topics_validate_message.count", tags: [:topic]), counter("port.message.count", tags: [:function, :direction]), sum("network.request.blocks", tags: [:result, :type, :reason]), diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex index d77abd552..32593996d 100644 --- a/lib/libp2p_port.ex +++ b/lib/libp2p_port.ex @@ -1,8 +1,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do @moduledoc """ A GenServer that allows other elixir processes to send and receive commands to/from - the LibP2P server in Go. For now, it only supports subscribing and unsubscribing from - topics. + the LibP2P server in Go. Requests are generated with an ID, which is returned when calling. Those IDs appear in the responses that might be listened to by other processes. @@ -26,6 +25,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do SendResponse, SetHandler, SubscribeToTopic, + Tracer, UnsubscribeFromTopic, ValidateMessage } @@ -310,6 +310,68 @@ defmodule LambdaEthereumConsensus.Libp2pPort do send(pid, {:response, result}) end + defp handle_notification(%Tracer{t: {:add_peer, %{}}}, _state) do + :telemetry.execute([:network, :pubsub_peers], %{}, %{ + result: "add" + }) + end + + defp handle_notification(%Tracer{t: {:remove_peer, %{}}}, _state) do + :telemetry.execute([:network, :pubsub_peers], %{}, %{ + result: "remove" + }) + end + + defp handle_notification(%Tracer{t: {:joined, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topic_active], %{active: 1}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:left, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topic_active], %{active: -1}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:grafted, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_graft], %{}, %{topic: get_topic_name(topic)}) + end + + defp handle_notification(%Tracer{t: {:pruned, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_prune], %{}, %{topic: get_topic_name(topic)}) + end + + defp handle_notification(%Tracer{t: {:deliver_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_deliver_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:duplicate_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_duplicate_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:reject_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_reject_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:un_deliverable_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_un_deliverable_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + + defp handle_notification(%Tracer{t: {:validate_message, %{topic: topic}}}, _state) do + :telemetry.execute([:network, :pubsub_topics_validate_message], %{}, %{ + topic: get_topic_name(topic) + }) + end + defp parse_args(args) do args |> Keyword.validate!(@default_args) @@ -339,4 +401,11 @@ defmodule LambdaEthereumConsensus.Libp2pPort do {:response, {res, %ResultMessage{message: message}}} -> [res | message] |> List.to_tuple() end end + + defp get_topic_name(topic) do + case topic |> String.split("/") |> Enum.fetch(3) do + {:ok, name} -> name + :error -> topic + end + end end diff --git a/metrics/grafana/provisioning/dashboards/home.json b/metrics/grafana/provisioning/dashboards/home.json index 17eadf027..feda66b71 100644 --- a/metrics/grafana/provisioning/dashboards/home.json +++ b/metrics/grafana/provisioning/dashboards/home.json @@ -67,9 +67,7 @@ "minVizWidth": 75, "orientation": "auto", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -130,9 +128,7 @@ }, "pieType": "pie", "reduceOptions": { - "calcs": [ - "lastNotNull" - ], + "calcs": ["lastNotNull"], "fields": "", "values": false }, @@ -546,7 +542,7 @@ }, "gridPos": { "h": 6, - "w": 12, + "w": 24, "x": 0, "y": 12 }, @@ -897,7 +893,815 @@ "refId": "D" } ], - "title": "Libp2pPort messages", + "title": "Libp2pPort Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 12, + "y": 12 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "sum(network_pubsub_peers_count{result=\"add\"}) - sum(network_pubsub_peers_count{result=\"remove\"})", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "__auto", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Peers (Gossip)", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 12, + "y": 12 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "network_pubsub_topic_active_active", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Topics Activity", + "type": "heatmap" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "network_pubsub_topics_graft_count{} - network_pubsub_topics_prune_count{}", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Grafted", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_deliver_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Deliver Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_duplicate_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Duplicate Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_reject_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Reject Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_un_deliverable_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Undeliverable Messages", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 18 + }, + "id": 12, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "PBFA97CFB590B2093" + }, + "disableTextWrap": false, + "editorMode": "code", + "exemplar": false, + "expr": "rate(network_pubsub_topics_validate_message_count{}[$__rate_interval])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "instant": false, + "interval": "", + "legendFormat": "{{topic}}", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Validate Messages", "type": "timeseries" } ], diff --git a/native/libp2p_port/internal/proto_helpers/proto_helpers.go b/native/libp2p_port/internal/proto_helpers/proto_helpers.go index 5d63947b6..9fb9b6888 100644 --- a/native/libp2p_port/internal/proto_helpers/proto_helpers.go +++ b/native/libp2p_port/internal/proto_helpers/proto_helpers.go @@ -20,6 +20,72 @@ func ConfigFromInitArgs(initArgs *proto_defs.InitArgs) Config { } } +func AddPeerNotification() proto_defs.Notification { + addPeerNotification := &proto_defs.AddPeerGossip{} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_AddPeer{AddPeer: addPeerNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func RemovePeerNotification() proto_defs.Notification { + removePeerNotification := &proto_defs.RemovePeerGossip{} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_RemovePeer{RemovePeer: removePeerNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func JoinNotification(topic string) proto_defs.Notification { + joinNotification := &proto_defs.Join{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Joined{Joined: joinNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func LeaveNofication(topic string) proto_defs.Notification { + leaveNofication := &proto_defs.Leave{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Left{Left: leaveNofication}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func GraftNotification(topic string) proto_defs.Notification { + graftNotification := &proto_defs.Graft{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Grafted{Grafted: graftNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func PruneNotification(topic string) proto_defs.Notification { + pruneNotification := &proto_defs.Prune{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Pruned{Pruned: pruneNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func ValidateMessageNotification(topic string) proto_defs.Notification { + validateMessageNotification := &proto_defs.ValidateMessageGossip{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_ValidateMessage{ValidateMessage: validateMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func DeliverMessageNotification(topic string) proto_defs.Notification { + deliverMessageNotification := &proto_defs.DeliverMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_DeliverMessage{DeliverMessage: deliverMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func UndeliverableMessageNotification(topic string) proto_defs.Notification { + unDeliverableMessageNotification := &proto_defs.UnDeliverableMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_UnDeliverableMessage{UnDeliverableMessage: unDeliverableMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func RejectMessageNotification(topic string) proto_defs.Notification { + rejectMessageNotification := &proto_defs.RejectMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_RejectMessage{RejectMessage: rejectMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + +func DuplicateMessageNotification(topic string) proto_defs.Notification { + duplicateMessageNotification := &proto_defs.DuplicateMessage{Topic: topic} + tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_DuplicateMessage{DuplicateMessage: duplicateMessageNotification}} + return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}} +} + func GossipNotification(topic string, handler, msgId, message []byte) proto_defs.Notification { gossipSubNotification := &proto_defs.GossipSub{Topic: []byte(topic), Handler: handler, MsgId: msgId, Message: message} return proto_defs.Notification{N: &proto_defs.Notification_Gossip{Gossip: gossipSubNotification}} diff --git a/native/libp2p_port/internal/subscriptions/subscriptions.go b/native/libp2p_port/internal/subscriptions/subscriptions.go index 74e526d6c..bdd69de95 100644 --- a/native/libp2p_port/internal/subscriptions/subscriptions.go +++ b/native/libp2p_port/internal/subscriptions/subscriptions.go @@ -14,6 +14,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" ) type subscription struct { @@ -28,6 +29,81 @@ type Subscriber struct { port *port.Port } +type GossipTracer struct { + port *port.Port +} + +func (g GossipTracer) AddPeer(p peer.ID, proto protocol.ID) { + notification := proto_helpers.AddPeerNotification() + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) RemovePeer(p peer.ID) { + notification := proto_helpers.RemovePeerNotification() + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Join(topic string) { + notification := proto_helpers.JoinNotification(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Leave(topic string) { + notification := proto_helpers.LeaveNofication(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Graft(p peer.ID, topic string) { + notification := proto_helpers.GraftNotification(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) Prune(p peer.ID, topic string) { + notification := proto_helpers.PruneNotification(topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) ValidateMessage(msg *pubsub.Message) { + notification := proto_helpers.ValidateMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) DeliverMessage(msg *pubsub.Message) { + notification := proto_helpers.DeliverMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) UndeliverableMessage(msg *pubsub.Message) { + notification := proto_helpers.UndeliverableMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) RejectMessage(msg *pubsub.Message, reason string) { + notification := proto_helpers.RejectMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) DuplicateMessage(msg *pubsub.Message) { + notification := proto_helpers.DuplicateMessageNotification(*msg.Topic) + g.port.SendNotification(¬ification) +} + +func (g GossipTracer) ThrottlePeer(p peer.ID) { + // no-op +} + +func (g GossipTracer) RecvRPC(rpc *pubsub.RPC) { + // no-op +} + +func (g GossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + // no-op +} + +func (g GossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) { + // no-op +} + func NewSubscriber(p *port.Port, h host.Host) Subscriber { heartbeat := 700 * time.Millisecond gsubParams := pubsub.DefaultGossipSubParams() @@ -74,6 +150,7 @@ func NewSubscriber(p *port.Port, h host.Host) Subscriber { pubsub.WithPeerOutboundQueueSize(600), pubsub.WithValidateQueueSize(600), pubsub.WithMaxMessageSize(10 * (1 << 20)), // 10 MB + pubsub.WithRawTracer(GossipTracer{port: p}), } gsub, err := pubsub.NewGossipSub(context.Background(), h, options...) diff --git a/proto/libp2p.proto b/proto/libp2p.proto index a1b65f867..936852217 100644 --- a/proto/libp2p.proto +++ b/proto/libp2p.proto @@ -27,6 +27,47 @@ message UnsubscribeFromTopic { string name = 1; } +message AddPeerGossip {} +message RemovePeerGossip {} + +message Join { + // topic that was joined + string topic = 1; +} + +message Leave { + // topic that was abandoned + string topic = 1; +} + +message Graft { + string topic = 1; +} + +message Prune { + string topic = 1; +} + +message ValidateMessageGossip { + string topic = 1; +} + +message DeliverMessage { + string topic = 1; +} + +message UnDeliverableMessage { + string topic = 1; +} + +message RejectMessage { + string topic = 1; +} + +message DuplicateMessage { + string topic = 1; +} + message AddPeer { bytes id = 1; repeated string addrs = 2; @@ -117,11 +158,28 @@ message Result { } } +message Tracer { + oneof t { + Join joined = 1; + Leave left = 2; + Graft grafted = 3; + Prune pruned = 4; + ValidateMessageGossip validate_message = 5; + DeliverMessage deliver_message = 6; + UnDeliverableMessage un_deliverable_message = 7; + RejectMessage reject_message = 8; + DuplicateMessage duplicate_message = 9; + AddPeerGossip add_peer = 10; + RemovePeerGossip remove_peer = 11; + } +} + message Notification { oneof n { GossipSub gossip = 1; Request request = 2; NewPeer new_peer = 3; Result result = 4; + Tracer tracer = 5; } } From b9615e8a40fecc24aef9ef8d6b0ec769cf69ca2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 5 Dec 2023 12:56:13 -0300 Subject: [PATCH 10/14] fix: don't pre-calculate hashes in `weigh_justification_and_finalization` (#497) --- .../fork_choice/handlers.ex | 37 +++--- .../state_transition/accessors.ex | 8 +- .../state_transition/epoch_processing.ex | 122 +++++------------- lib/spec/runners/fork_choice.ex | 50 ++++--- lib/spec/runners/sanity.ex | 26 ++-- 5 files changed, 93 insertions(+), 150 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 5d06630f5..509e468a4 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -172,7 +172,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do |> update_checkpoints(state.current_justified_checkpoint, state.finalized_checkpoint) # Eagerly compute unrealized justification and finality |> compute_pulled_up_tip(block_root) - |> then(&{:ok, &1}) end end @@ -216,27 +215,29 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do end) end + # Pull up the post-state of the block to the next epoch boundary def compute_pulled_up_tip(%Store{block_states: states} = store, block_root) do - # Pull up the post-state of the block to the next epoch boundary - # TODO: handle possible errors - {:ok, state} = EpochProcessing.process_justification_and_finalization(states[block_root]) + result = EpochProcessing.process_justification_and_finalization(states[block_root]) - block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot) - current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot() + with {:ok, state} <- result do + block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot) + current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot() - unrealized_justifications = - Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint) + unrealized_justifications = + Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint) - %Store{store | unrealized_justifications: unrealized_justifications} - |> update_unrealized_checkpoints( - state.current_justified_checkpoint, - state.finalized_checkpoint - ) - |> if_then_update( - block_epoch < current_epoch, - # If the block is from a prior epoch, apply the realized values - &update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint) - ) + %Store{store | unrealized_justifications: unrealized_justifications} + |> update_unrealized_checkpoints( + state.current_justified_checkpoint, + state.finalized_checkpoint + ) + |> if_then_update( + block_epoch < current_epoch, + # If the block is from a prior epoch, apply the realized values + &update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint) + ) + |> then(&{:ok, &1}) + end end # Update unrealized checkpoints in store if necessary diff --git a/lib/lambda_ethereum_consensus/state_transition/accessors.ex b/lib/lambda_ethereum_consensus/state_transition/accessors.ex index 376e28a54..1399d7283 100644 --- a/lib/lambda_ethereum_consensus/state_transition/accessors.ex +++ b/lib/lambda_ethereum_consensus/state_transition/accessors.ex @@ -120,7 +120,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do Return the current epoch. """ @spec get_current_epoch(BeaconState.t()) :: SszTypes.epoch() - def get_current_epoch(%BeaconState{slot: slot} = _state) do + def get_current_epoch(%BeaconState{slot: slot}) do Misc.compute_epoch_at_slot(slot) end @@ -373,8 +373,10 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do @spec get_block_root_at_slot(BeaconState.t(), SszTypes.slot()) :: {:ok, SszTypes.root()} | {:error, binary()} def get_block_root_at_slot(state, slot) do - if slot < state.slot and state.slot <= slot + ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT") do - root = Enum.at(state.block_roots, rem(slot, ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT"))) + slots_per_historical_root = ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT") + + if slot < state.slot and state.slot <= slot + slots_per_historical_root do + root = Enum.at(state.block_roots, rem(slot, slots_per_historical_root)) {:ok, root} else {:error, "Block root not available"} diff --git a/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex b/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex index 9773582b7..fe31f6d41 100644 --- a/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex +++ b/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex @@ -364,8 +364,6 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do previous_target_balance, current_target_balance ) - else - {:error, reason} -> {:error, reason} end end end @@ -373,55 +371,28 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do defp weigh_justification_and_finalization( state, total_active_balance, - previous_epoch_target_balance, - current_epoch_target_balance + previous_target_balance, + current_target_balance ) do previous_epoch = Accessors.get_previous_epoch(state) current_epoch = Accessors.get_current_epoch(state) - old_previous_justified_checkpoint = state.previous_justified_checkpoint - old_current_justified_checkpoint = state.current_justified_checkpoint - - with {:ok, previous_block_root} <- Accessors.get_block_root(state, previous_epoch), - {:ok, current_block_root} <- Accessors.get_block_root(state, current_epoch) do - new_state = - state - |> update_first_bit() - |> update_previous_epoch_justified( - previous_epoch_target_balance * 3 >= total_active_balance * 2, - previous_epoch, - previous_block_root - ) - |> update_current_epoch_justified( - current_epoch_target_balance * 3 >= total_active_balance * 2, - current_epoch, - current_block_root - ) - |> update_checkpoint_finalization( - old_previous_justified_checkpoint, - current_epoch, - 1..3, - 3 - ) - |> update_checkpoint_finalization( - old_previous_justified_checkpoint, - current_epoch, - 1..2, - 2 - ) - |> update_checkpoint_finalization( - old_current_justified_checkpoint, - current_epoch, - 0..2, - 2 - ) - |> update_checkpoint_finalization( - old_current_justified_checkpoint, - current_epoch, - 0..1, - 1 - ) - - {:ok, new_state} + old_previous_justified = state.previous_justified_checkpoint + old_current_justified = state.current_justified_checkpoint + previous_is_justified = previous_target_balance * 3 >= total_active_balance * 2 + current_is_justified = current_target_balance * 3 >= total_active_balance * 2 + + new_state = update_first_bit(state) + + with {:ok, new_state} <- + update_epoch_justified(new_state, previous_is_justified, previous_epoch, 1), + {:ok, new_state} <- + update_epoch_justified(new_state, current_is_justified, current_epoch, 0) do + new_state + |> update_checkpoint_finalization(old_previous_justified, current_epoch, 1..3, 3) + |> update_checkpoint_finalization(old_previous_justified, current_epoch, 1..2, 2) + |> update_checkpoint_finalization(old_current_justified, current_epoch, 0..2, 2) + |> update_checkpoint_finalization(old_current_justified, current_epoch, 0..1, 1) + |> then(&{:ok, &1}) end end @@ -439,50 +410,21 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do } end - defp update_previous_epoch_justified(state, true, previous_epoch, previous_block_root) do - new_checkpoint = %SszTypes.Checkpoint{ - epoch: previous_epoch, - root: previous_block_root - } - - bits = - state.justification_bits - |> BitVector.new(4) - |> BitVector.set(1) - |> to_byte() - - %BeaconState{ - state - | current_justified_checkpoint: new_checkpoint, - justification_bits: bits - } - end - - defp update_previous_epoch_justified(state, false, _previous_epoch, _previous_block_root) do - state - end - - defp update_current_epoch_justified(state, true, current_epoch, current_block_root) do - new_checkpoint = %SszTypes.Checkpoint{ - epoch: current_epoch, - root: current_block_root - } + defp update_epoch_justified(state, false, _, _), do: {:ok, state} - bits = - state.justification_bits - |> BitVector.new(4) - |> BitVector.set(0) - |> to_byte() + defp update_epoch_justified(state, true, epoch, index) do + with {:ok, block_root} <- Accessors.get_block_root(state, epoch) do + new_checkpoint = %SszTypes.Checkpoint{epoch: epoch, root: block_root} - %BeaconState{ - state - | current_justified_checkpoint: new_checkpoint, - justification_bits: bits - } - end + bits = + state.justification_bits + |> BitVector.new(4) + |> BitVector.set(index) + |> to_byte() - defp update_current_epoch_justified(state, false, _current_epoch, _current_block_root) do - state + %{state | current_justified_checkpoint: new_checkpoint, justification_bits: bits} + |> then(&{:ok, &1}) + end end defp update_checkpoint_finalization( @@ -497,7 +439,7 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do |> BitVector.new(4) |> BitVector.all?(range) - if bits_set && old_justified_checkpoint.epoch + offset == current_epoch do + if bits_set and old_justified_checkpoint.epoch + offset == current_epoch do %BeaconState{state | finalized_checkpoint: old_justified_checkpoint} else state diff --git a/lib/spec/runners/fork_choice.ex b/lib/spec/runners/fork_choice.ex index 0e6044987..f4c51eabf 100644 --- a/lib/spec/runners/fork_choice.ex +++ b/lib/spec/runners/fork_choice.ex @@ -12,27 +12,27 @@ defmodule ForkChoiceTestRunner do @disabled_on_block_cases [ # "basic", - "incompatible_justification_update_end_of_epoch", - "incompatible_justification_update_start_of_epoch", - "justification_update_beginning_of_epoch", - "justification_update_end_of_epoch", - "justification_withholding", - "justification_withholding_reverse_order", - "justified_update_always_if_better", - "justified_update_monotonic", - "justified_update_not_realized_finality", - "new_finalized_slot_is_justified_checkpoint_ancestor", - "not_pull_up_current_epoch_block", + # "incompatible_justification_update_end_of_epoch", + # "incompatible_justification_update_start_of_epoch", + # "justification_update_beginning_of_epoch", + # "justification_update_end_of_epoch", + # "justification_withholding", + # "justification_withholding_reverse_order", + # "justified_update_always_if_better", + # "justified_update_monotonic", + # "justified_update_not_realized_finality", + # "new_finalized_slot_is_justified_checkpoint_ancestor", + # "not_pull_up_current_epoch_block", # "on_block_bad_parent_root", - "on_block_before_finalized", - "on_block_checkpoints", - "on_block_finalized_skip_slots", - "on_block_finalized_skip_slots_not_in_skip_chain", + # "on_block_before_finalized", + # "on_block_checkpoints", + # "on_block_finalized_skip_slots", + # "on_block_finalized_skip_slots_not_in_skip_chain", # "on_block_future_block", # "proposer_boost", # "proposer_boost_root_same_slot_untimely_block", - "pull_up_on_tick", - "pull_up_past_epoch_block" + # "pull_up_on_tick", + # "pull_up_past_epoch_block" ] @disabled_ex_ante_cases [ @@ -47,18 +47,18 @@ defmodule ForkChoiceTestRunner do # "chain_no_attestations", "discard_equivocations_on_attester_slashing", "discard_equivocations_slashed_validator_censoring", - "filtered_block_tree", + # "filtered_block_tree", # "genesis", "proposer_boost_correct_head", "shorter_chain_but_heavier_weight", "split_tie_breaker_no_attestations", - "voting_source_beyond_two_epoch", + # "voting_source_beyond_two_epoch", "voting_source_within_two_epoch" ] @disabled_reorg_cases [ - "delayed_justification_current_epoch", - "delayed_justification_previous_epoch", + # "delayed_justification_current_epoch", + # "delayed_justification_previous_epoch", "include_votes_another_empty_chain_with_enough_ffg_votes_current_epoch", "include_votes_another_empty_chain_with_enough_ffg_votes_previous_epoch", "include_votes_another_empty_chain_without_enough_ffg_votes_current_epoch", @@ -68,8 +68,8 @@ defmodule ForkChoiceTestRunner do ] @disabled_withholding_cases [ - "withholding_attack", - "withholding_attack_unviable_honest_chain" + "withholding_attack" + # "withholding_attack_unviable_honest_chain" ] @impl TestRunner @@ -207,8 +207,4 @@ defmodule ForkChoiceTestRunner do {:ok, store} end - - defp apply_step(_, _, _) do - {:error, "unknown step"} - end end diff --git a/lib/spec/runners/sanity.ex b/lib/spec/runners/sanity.ex index 6535858a2..218ea5dc8 100644 --- a/lib/spec/runners/sanity.ex +++ b/lib/spec/runners/sanity.ex @@ -40,22 +40,22 @@ defmodule SanityTestRunner do # "historical_batch", # "inactivity_scores_full_participation_leaking", # "inactivity_scores_leaking", - "invalid_all_zeroed_sig", - "invalid_duplicate_attester_slashing_same_block", - "invalid_duplicate_bls_changes_same_block", + # "invalid_all_zeroed_sig", + # "invalid_duplicate_attester_slashing_same_block", + # "invalid_duplicate_bls_changes_same_block", # "invalid_duplicate_deposit_same_block", - "invalid_duplicate_proposer_slashings_same_block", - "invalid_duplicate_validator_exit_same_block", - "invalid_incorrect_block_sig", + # "invalid_duplicate_proposer_slashings_same_block", + # "invalid_duplicate_validator_exit_same_block", + # "invalid_incorrect_block_sig", # "invalid_incorrect_proposer_index_sig_from_expected_proposer", # "invalid_incorrect_proposer_index_sig_from_proposer_index", - "invalid_incorrect_state_root", + # "invalid_incorrect_state_root", # "invalid_only_increase_deposit_count", # "invalid_parent_from_same_slot", # "invalid_prev_slot_block_transition", # "invalid_same_slot_block_transition", - "invalid_similar_proposer_slashings_same_block", - "invalid_two_bls_changes_of_different_addresses_same_validator_same_block", + # "invalid_similar_proposer_slashings_same_block", + # "invalid_two_bls_changes_of_different_addresses_same_validator_same_block", # "invalid_withdrawal_fail_second_block_payload_isnt_compatible", # "is_execution_enabled_false", # "many_partial_withdrawals_in_epoch_transition", @@ -69,7 +69,7 @@ defmodule SanityTestRunner do # "proposer_slashing", # "skipped_slots", # "slash_and_exit_diff_index", - "slash_and_exit_same_index" + # "slash_and_exit_same_index" # "sync_committee_committee__empty", # "sync_committee_committee__full", # "sync_committee_committee__half", @@ -87,8 +87,10 @@ defmodule SanityTestRunner do # "slots_1", # "slots_2", # "over_epoch_boundary", - "historical_accumulator", - "double_empty_epoch" + # NOTE: too long to run in CI + # TODO: optimize + "historical_accumulator" + # "double_empty_epoch" ] @impl TestRunner From db881a59e4ca184dc6fd6589bf9aa352cfbb7567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Arjovsky?= Date: Wed, 6 Dec 2023 13:45:38 +0100 Subject: [PATCH 11/14] docs: add concurrency design doc (#499) --- docs/concurrency_design.md | 131 +++++++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 docs/concurrency_design.md diff --git a/docs/concurrency_design.md b/docs/concurrency_design.md new file mode 100644 index 000000000..cf0f13552 --- /dev/null +++ b/docs/concurrency_design.md @@ -0,0 +1,131 @@ +# Store concurrency design + +## Current situation + +The following is a sequence diagram on the lifecycle of a block, from the moment its notification arrives in the LibP2P port and until it's processed and saved. Each lane is a separate process and may encompass many different modules. + +```mermaid +sequenceDiagram + +participant port as LibP2P Port
(Genserver) +participant sub as Subscriber
(GenStage) +participant consumer as GossipConsumer
(broadway) +participant pending as Pending Blocks
(GenServer) +participant store as Fork-choice store (GenServer) +participant DB as KV Store + +port ->> sub: gossip(id) +sub ->> port: accept(id) +sub ->> consumer: handle_demand +consumer ->> consumer: decompress
decode
call handler +consumer ->> pending: decoded block +pending ->> store: has_block(block) +store -->> pending: false +pending ->> store: has_block(parent) +store -->> pending: true +pending ->> store: on_block(block) +store ->> store: validate block
calculate state transition
add state +store ->> DB: store block +store -->> pending: :ok +``` + +Let's look at the main issues and some improvements that may help with them. + +### Blocking Calls + +`Store.on_block(block)` (write operation) is blocking. This operation is particularly big, as it performs the state transition. These causes some issues: + +- It's a call, so the calling process (in our case the pending blocks processor) will be blocked until the state transition is finished. No further blocks will be downloaded while this happens. +- Any other store call (adding an attestation, checking if a block is present) will be blocked. + +Improvements: + +- Making it a `cast`. The caller doesn't immediately need to know what's the result of the state transition. We can do that an async operation. +- Making the state transition be calculated in an async way, so the store can take other work like adding attestations while the cast happens. + +### Concurrent downloads + +Downloading a block is: + +- A heavy IO operation (non-cpu consuming). +- Independent from downloading a different block. + +Improvements: +- We should consider, instead of downloading them in sequence, downloading them in different tasks. + +### Big Objects in Mailboxes + +Blocks are pretty big objects and they are passed around in process mailboxes even for simple calls like `Store.has_block(block)`. We should minimize this kind of interactions as putting big structures in mailboxes slows their processing down. + +Improvements: + +- We could store the blocks in the DB immediately after downloading them. +- Checking if a block is present could be done directly with the DB, without need to check the store. +- If we want faster access for blocks, we can build an ETS block cache. + +### Other issues + +- States aren't ever stored in the DB. This is not a concurrency issue, but we should fix it. +- Low priority, but we should evaluate dropping the Subscriber genserver and broadway, and have one task per message under a supervisor. + +## State Diagram + +These are the states that a block may have: + +- New: just downloaded, decompressed and decoded +- Pending: no parent. +- Child. Parent is present and downloaded. +- BlockChild: Parent is a valid block. +- StateChild: Parent’s state transition is calculated. +- Included: we calculated the state transition for this block and the state is available. It's now part of the fork tree. + +The block diagram looks something like this: + +```mermaid +stateDiagram-v2 + [*] --> New: Download, decompress, decode + New --> Child: Parent is present + New --> Pending: Parent is not present + Pending --> Child: Parent is downloaded + Child --> BlockChild: Parent is a valid block (but not a state) + Child --> Invalid: Parent is Invalid + BlockChild --> Invalid: store validation fails + BlockChild --> StateChild: Parent state is present + StateChild --> NewState: state transition calculated + StateChild --> Invalid: state transition fails +``` + +### A possible new design + +```mermaid +sequenceDiagram + participant port as LibP2P Port
(Genserver) + participant decoder as Decoder
(Supervised task) + participant tracker as Block Tracker
(GenServer) + participant down as Downloader
(Supervised task) + participant store as Fork Choice Store
(Genserver) + participant state_t as State Transition Task
(Supervised task) + participant DB as KV Store + + port ->> decoder: gossip(id) + decoder ->> port: accept(id) + decoder ->> decoder: decompress
decode
call handler + decoder ->> DB: store_block_if_not_present(block) + decoder ->> tracker: new_block(root) + tracker ->> DB: present?(parent_root) + DB -->> tracker: false + tracker ->> down: download(parent_root) + down ->> DB: store_block_if_not_present(parent_root) + down ->> tracker: downloaded(parent_root) + tracker ->> store: on_block(root) + store ->> DB: get_block(root) + store ->> store: validate block + store ->> state_t: state_transition(block) + state_t ->> DB: store_state(new_state) + state_t ->> store: on_state(new_state) + state_t ->> tracker: on_state(new_state) +``` + +Some pending definitions: + +- The block tracker could eventually be a block cache, and maintain blocks and their state in an ETS that can be accessed easily by other processes. From 6e6f1065fc2a5c290b0cc33f70721d860bd7a982 Mon Sep 17 00:00:00 2001 From: Mete Karasakal <32202283+karasakalmt@users.noreply.github.com> Date: Wed, 6 Dec 2023 15:51:32 +0200 Subject: [PATCH 12/14] feat: merkle proof spec test (#496) --- .../state_transition/predicates.ex | 13 +++-- lib/spec/runners/light_client.ex | 52 +++++++++++++++++++ 2 files changed, 60 insertions(+), 5 deletions(-) create mode 100644 lib/spec/runners/light_client.ex diff --git a/lib/lambda_ethereum_consensus/state_transition/predicates.ex b/lib/lambda_ethereum_consensus/state_transition/predicates.ex index 189f01277..cf3885d31 100644 --- a/lib/lambda_ethereum_consensus/state_transition/predicates.ex +++ b/lib/lambda_ethereum_consensus/state_transition/predicates.ex @@ -123,11 +123,14 @@ defmodule LambdaEthereumConsensus.StateTransition.Predicates do SszTypes.root() ) :: boolean def is_valid_merkle_branch?(leaf, branch, depth, index, root) do - root == - branch - |> Enum.take(depth) - |> Enum.with_index() - |> Enum.reduce(leaf, fn {v, i}, value -> hash_merkle_node(v, value, index, i) end) + root == generate_merkle_proof(leaf, branch, depth, index) + end + + def generate_merkle_proof(leaf, branch, depth, index) do + branch + |> Enum.take(depth) + |> Enum.with_index() + |> Enum.reduce(leaf, fn {v, i}, value -> hash_merkle_node(v, value, index, i) end) end defp hash_merkle_node(value_1, value_2, index, i) do diff --git a/lib/spec/runners/light_client.ex b/lib/spec/runners/light_client.ex new file mode 100644 index 000000000..131d50b37 --- /dev/null +++ b/lib/spec/runners/light_client.ex @@ -0,0 +1,52 @@ +defmodule LightClientTestRunner do + alias LambdaEthereumConsensus.StateTransition.Predicates + use ExUnit.CaseTemplate + use TestRunner + + @moduledoc """ + Runner for LightClient test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/light_client + """ + + # Remove handler from here once you implement the corresponding functions + @disabled_handlers [ + # "single_merkle_proof", + "sync", + "update_ranking" + ] + + @impl TestRunner + def skip?(%SpecTestCase{} = testcase) do + Enum.member?(@disabled_handlers, testcase.handler) + end + + @impl TestRunner + def run_test_case(%SpecTestCase{} = testcase) do + handle(testcase.handler, testcase) + end + + defp handle("single_merkle_proof", testcase) do + case_dir = SpecTestCase.dir(testcase) + + object_root = + SpecTestUtils.read_ssz_from_file!( + case_dir <> "/object.ssz_snappy", + String.to_existing_atom("Elixir.SszTypes." <> testcase.suite) + ) + |> Ssz.hash_tree_root!() + + %{leaf: leaf, leaf_index: leaf_index, branch: branch} = + YamlElixir.read_from_file!(case_dir <> "/proof.yaml") + |> SpecTestUtils.sanitize_yaml() + + res = + Predicates.is_valid_merkle_branch?( + leaf, + branch, + Constants.deposit_contract_tree_depth() + 1, + leaf_index, + object_root + ) + + assert true == res + end +end From b5a1adc82392fe142a3fddb6f4e2622ce70c8bd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:56:13 -0300 Subject: [PATCH 13/14] fix: shuffling tests not being generated (#502) --- lib/mix/tasks/generate_spec_tests.ex | 15 +++++++++++---- lib/spec/runners/shuffling.ex | 9 +++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/lib/mix/tasks/generate_spec_tests.ex b/lib/mix/tasks/generate_spec_tests.ex index 17d3d39cf..56a89210c 100644 --- a/lib/mix/tasks/generate_spec_tests.ex +++ b/lib/mix/tasks/generate_spec_tests.ex @@ -10,23 +10,30 @@ defmodule Mix.Tasks.GenerateSpecTests do use Mix.Task require Logger - @configs ["mainnet", "minimal"] - @forks ["altair", "deneb", "phase0"] + @configs ["mainnet", "minimal", "general"] + @forks ["phase0", "altair", "bellatrix", "capella", "deneb"] @shortdoc "Generates tests for spec test files" @impl Mix.Task def run(_args) do {:ok, file_names} = File.ls(Path.join(["lib", "spec", "runners"])) - runners = Enum.map(file_names, fn file_name -> Path.basename(file_name, ".ex") end) + runners = Enum.map(file_names, &Path.basename(&1, ".ex")) + # Generate all tests for Capella fork for config <- @configs, runner <- runners do generate_test(config, "capella", runner) end + # Generate tests for all forks in general preset for fork <- @forks, runner <- runners do generate_test("general", fork, runner) end + # Generate shuffling tests for all testcases + for config <- @configs, fork <- @forks do + generate_test(config, fork, "shuffling") + end + File.touch(Path.join(["test", "generated"])) end @@ -45,9 +52,9 @@ defmodule Mix.Tasks.GenerateSpecTests do end defp test_module(cases, config, fork, runner) do - r = Macro.camelize(runner) c = Macro.camelize(config) f = Macro.camelize(fork) + r = Macro.camelize(runner) module_name = "Elixir.#{c}.#{f}.#{r}Test" |> String.to_atom() runner_module = "Elixir.#{r}TestRunner" |> String.to_atom() diff --git a/lib/spec/runners/shuffling.ex b/lib/spec/runners/shuffling.ex index f6cec793e..740c7e714 100644 --- a/lib/spec/runners/shuffling.ex +++ b/lib/spec/runners/shuffling.ex @@ -1,12 +1,13 @@ defmodule ShufflingTestRunner do - alias LambdaEthereumConsensus.StateTransition.Misc - use ExUnit.CaseTemplate - use TestRunner - @moduledoc """ Runner for Operations test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/shuffling """ + use ExUnit.CaseTemplate + use TestRunner + + alias LambdaEthereumConsensus.StateTransition.Misc + # Remove handler from here once you implement the corresponding functions @disabled_handlers [ # "core" From 99f49f911827126a6414f9779c0d0457664ef9a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Thu, 7 Dec 2023 11:56:33 -0300 Subject: [PATCH 14/14] fix: wrong head resolving in fork-choice spec-tests (#504) --- .../fork_choice/handlers.ex | 2 +- .../fork_choice/helpers.ex | 30 +++++++--------- lib/spec/runners/fork_choice.ex | 36 +++++++++---------- 3 files changed, 31 insertions(+), 37 deletions(-) diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 509e468a4..e6223e954 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -336,7 +336,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do message = %Checkpoint{epoch: target.epoch, root: beacon_block_root} attesting_indices - |> Stream.filter(&MapSet.member?(store.equivocating_indices, &1)) + |> Stream.reject(&MapSet.member?(store.equivocating_indices, &1)) |> Stream.filter(&(not Map.has_key?(messages, &1) or target.epoch > messages[&1].epoch)) |> Enum.reduce(messages, &Map.put(&2, &1, message)) |> then(&{:ok, %Store{store | latest_messages: &1}}) diff --git a/lib/lambda_ethereum_consensus/fork_choice/helpers.ex b/lib/lambda_ethereum_consensus/fork_choice/helpers.ex index ce38e83ee..1ac8c5d93 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/helpers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/helpers.ex @@ -58,16 +58,15 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do Stream.cycle([nil]) |> Enum.reduce_while(head, fn nil, head -> - children = - blocks - |> Stream.filter(fn {_, block} -> block.parent_root == head end) - |> Enum.map(fn {root, _} -> root end) - - if Enum.empty?(children) do - {:halt, head} - else - {:cont, Enum.max_by(children, fn root -> get_weight(store, root) end)} - end + blocks + |> Stream.filter(fn {_, block} -> block.parent_root == head end) + |> Stream.map(fn {root, _} -> root end) + # Ties broken by favoring block with lexicographically higher root + |> Enum.sort(:desc) + |> then(fn + [] -> {:halt, head} + c -> {:cont, Enum.max_by(c, &get_weight(store, &1))} + end) end) |> then(&{:ok, &1}) end @@ -75,18 +74,15 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do defp get_weight(%Store{} = store, root) do state = store.checkpoint_states[store.justified_checkpoint] - unslashed_and_active_indices = - Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state)) - |> Enum.filter(fn i -> not Enum.at(state.validators, i).slashed end) - attestation_score = - unslashed_and_active_indices + Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state)) + |> Stream.reject(&Enum.at(state.validators, &1).slashed) |> Stream.filter(&Map.has_key?(store.latest_messages, &1)) - |> Stream.filter(&(not MapSet.member?(store.equivocating_indices, &1))) + |> Stream.reject(&MapSet.member?(store.equivocating_indices, &1)) |> Stream.filter(fn i -> Store.get_ancestor(store, store.latest_messages[i].root, store.blocks[root].slot) == root end) - |> Stream.map(fn i -> Enum.at(state.validators, i).effective_balance end) + |> Stream.map(&Enum.at(state.validators, &1).effective_balance) |> Enum.sum() if store.proposer_boost_root == <<0::256>> or diff --git a/lib/spec/runners/fork_choice.ex b/lib/spec/runners/fork_choice.ex index f4c51eabf..34be34a8e 100644 --- a/lib/spec/runners/fork_choice.ex +++ b/lib/spec/runners/fork_choice.ex @@ -37,38 +37,38 @@ defmodule ForkChoiceTestRunner do @disabled_ex_ante_cases [ # "ex_ante_attestations_is_greater_than_proposer_boost_with_boost", - "ex_ante_sandwich_with_boost_not_sufficient", - "ex_ante_sandwich_with_honest_attestation", - "ex_ante_sandwich_without_attestations" + # "ex_ante_sandwich_with_boost_not_sufficient", + # "ex_ante_sandwich_with_honest_attestation", + # "ex_ante_sandwich_without_attestations", # "ex_ante_vanilla" ] @disabled_get_head_cases [ # "chain_no_attestations", - "discard_equivocations_on_attester_slashing", - "discard_equivocations_slashed_validator_censoring", + # "discard_equivocations_on_attester_slashing", + # "discard_equivocations_slashed_validator_censoring", # "filtered_block_tree", # "genesis", - "proposer_boost_correct_head", - "shorter_chain_but_heavier_weight", - "split_tie_breaker_no_attestations", + # "proposer_boost_correct_head", + # "shorter_chain_but_heavier_weight", + # "split_tie_breaker_no_attestations", # "voting_source_beyond_two_epoch", - "voting_source_within_two_epoch" + # "voting_source_within_two_epoch" ] @disabled_reorg_cases [ # "delayed_justification_current_epoch", # "delayed_justification_previous_epoch", - "include_votes_another_empty_chain_with_enough_ffg_votes_current_epoch", - "include_votes_another_empty_chain_with_enough_ffg_votes_previous_epoch", - "include_votes_another_empty_chain_without_enough_ffg_votes_current_epoch", - "simple_attempted_reorg_delayed_justification_current_epoch", - "simple_attempted_reorg_delayed_justification_previous_epoch", - "simple_attempted_reorg_without_enough_ffg_votes" + # "include_votes_another_empty_chain_with_enough_ffg_votes_current_epoch", + # "include_votes_another_empty_chain_with_enough_ffg_votes_previous_epoch", + # "include_votes_another_empty_chain_without_enough_ffg_votes_current_epoch", + # "simple_attempted_reorg_delayed_justification_current_epoch", + # "simple_attempted_reorg_delayed_justification_previous_epoch", + # "simple_attempted_reorg_without_enough_ffg_votes" ] @disabled_withholding_cases [ - "withholding_attack" + # "withholding_attack", # "withholding_attack_unviable_honest_chain" ] @@ -81,9 +81,7 @@ defmodule ForkChoiceTestRunner do Enum.member?(@disabled_withholding_cases, testcase) end - def skip?(_testcase) do - true - end + def skip?(_testcase), do: true @impl TestRunner def run_test_case(testcase) do