Skip to content

Commit

Permalink
feat: fork choice metrics (#1232)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkenan authored and avilagaston9 committed Jul 23, 2024
1 parent 1fcf95f commit ea186a2
Show file tree
Hide file tree
Showing 6 changed files with 499 additions and 366 deletions.
17 changes: 9 additions & 8 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,13 @@ defmodule LambdaEthereumConsensus.ForkChoice do
end
end

@spec apply_handler(any(), any(), any()) :: any()
defp apply_handler(iter, state, handler) do
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
_, {:error, _} = err -> {:halt, err}
defp apply_handler(iter, name, state, handler) do
Metrics.span_operation(name, nil, nil, fn ->
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
_, {:error, _} = err -> {:halt, err}
end)
end)
end

Expand All @@ -212,11 +213,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do
# process block attestations
{:ok, new_store} <-
signed_block.message.body.attestations
|> apply_handler(new_store, &Handlers.on_attestation(&1, &2, true)),
|> apply_handler(:attestations, new_store, &Handlers.on_attestation(&1, &2, true)),
# process block attester slashings
{:ok, new_store} <-
signed_block.message.body.attester_slashings
|> apply_handler(new_store, &Handlers.on_attester_slashing/2) do
|> apply_handler(:attester_slashings, new_store, &Handlers.on_attester_slashing/2) do
Handlers.prune_checkpoint_states(new_store)
{:ok, new_store}
end
Expand Down
6 changes: 6 additions & 0 deletions lib/lambda_ethereum_consensus/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ defmodule LambdaEthereumConsensus.Metrics do
end
end

def span_operation(handler, transition, operation, f) do
:telemetry.span([:fork_choice, :latency], %{}, fn ->
{f.(), %{handler: handler, transition: transition, operation: operation}}
end)
end

def handler_span(module, action, f) do
:telemetry.span([:libp2pport, :handler], %{}, fn ->
{f.(), %{module: module, action: action}}
Expand Down
33 changes: 23 additions & 10 deletions lib/lambda_ethereum_consensus/state_transition/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do
This module contains functions for handling state transition
"""

alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.StateTransition.Accessors
alias LambdaEthereumConsensus.StateTransition.Math
alias LambdaEthereumConsensus.StateTransition.Misc
Expand Down Expand Up @@ -919,19 +920,31 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do
# Ensure that outstanding deposits are processed up to the maximum number of deposits
with :ok <- verify_deposits(state, body) do
{:ok, state}
|> for_ops(body.proposer_slashings, &process_proposer_slashing/2)
|> for_ops(body.attester_slashings, &process_attester_slashing/2)
|> Utils.map_ok(&process_attestation_batch(&1, body.attestations))
|> for_ops(body.deposits, &process_deposit/2)
|> for_ops(body.voluntary_exits, &process_voluntary_exit/2)
|> for_ops(body.bls_to_execution_changes, &process_bls_to_execution_change/2)
|> for_ops(:proposer_slashing, body.proposer_slashings, &process_proposer_slashing/2)
|> for_ops(:attester_slashing, body.attester_slashings, &process_attester_slashing/2)
|> apply_op(:attestation_batch, &process_attestation_batch(&1, body.attestations))
|> for_ops(:deposit, body.deposits, &process_deposit/2)
|> for_ops(:voluntary_exit, body.voluntary_exits, &process_voluntary_exit/2)
|> for_ops(
:bls_to_execution_change,
body.bls_to_execution_changes,
&process_bls_to_execution_change/2
)
end
end

defp for_ops(acc, operations, func) do
Enum.reduce_while(operations, acc, fn
operation, {:ok, state} -> {:cont, func.(state, operation)}
_, {:error, reason} -> {:halt, {:error, reason}}
defp apply_op(acc, op_name, func) do
Metrics.span_operation(:on_block, :process_block_operations, op_name, fn ->
Utils.map_ok(acc, func)
end)
end

defp for_ops(acc, op_name, operations, func) do
Metrics.span_operation(:on_block, :process_block_operations, op_name, fn ->
Enum.reduce_while(operations, acc, fn
operation, {:ok, state} -> {:cont, func.(state, operation)}
_, {:error, reason} -> {:halt, {:error, reason}}
end)
end)
end

Expand Down
51 changes: 34 additions & 17 deletions lib/lambda_ethereum_consensus/state_transition/state_transition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.StateTransition do
"""

require Logger
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.StateTransition.Accessors
alias LambdaEthereumConsensus.StateTransition.EpochProcessing
alias LambdaEthereumConsensus.StateTransition.Misc
Expand Down Expand Up @@ -108,17 +109,23 @@ defmodule LambdaEthereumConsensus.StateTransition do

state
|> EpochProcessing.process_justification_and_finalization()
|> map_ok(&EpochProcessing.process_inactivity_updates/1)
|> map_ok(&EpochProcessing.process_rewards_and_penalties/1)
|> map_ok(&EpochProcessing.process_registry_updates/1)
|> map_ok(&EpochProcessing.process_slashings/1)
|> map_ok(&EpochProcessing.process_eth1_data_reset/1)
|> map_ok(&EpochProcessing.process_effective_balance_updates/1)
|> map_ok(&EpochProcessing.process_slashings_reset/1)
|> map_ok(&EpochProcessing.process_randao_mixes_reset/1)
|> map_ok(&EpochProcessing.process_historical_summaries_update/1)
|> map_ok(&EpochProcessing.process_participation_flag_updates/1)
|> map_ok(&EpochProcessing.process_sync_committee_updates/1)
|> epoch_op(:inactivity_updates, &EpochProcessing.process_inactivity_updates/1)
|> epoch_op(:rewards_and_penalties, &EpochProcessing.process_rewards_and_penalties/1)
|> epoch_op(:registry_updates, &EpochProcessing.process_registry_updates/1)
|> epoch_op(:slashings, &EpochProcessing.process_slashings/1)
|> epoch_op(:eth1_data_reset, &EpochProcessing.process_eth1_data_reset/1)
|> epoch_op(:effective_balance_updates, &EpochProcessing.process_effective_balance_updates/1)
|> epoch_op(:slashings_reset, &EpochProcessing.process_slashings_reset/1)
|> epoch_op(:randao_mixes_reset, &EpochProcessing.process_randao_mixes_reset/1)
|> epoch_op(
:historical_summaries_update,
&EpochProcessing.process_historical_summaries_update/1
)
|> epoch_op(
:participation_flag_updates,
&EpochProcessing.process_participation_flag_updates/1
)
|> epoch_op(:sync_committee_updates, &EpochProcessing.process_sync_committee_updates/1)
|> tap(fn _ ->
end_time = System.monotonic_time(:millisecond)
Logger.debug("[Epoch processing] took #{end_time - start_time} ms")
Expand All @@ -136,16 +143,26 @@ defmodule LambdaEthereumConsensus.StateTransition do
start_time = System.monotonic_time(:millisecond)

{:ok, state}
|> map_ok(&Operations.process_block_header(&1, block))
|> map_ok(&Operations.process_withdrawals(&1, block.body.execution_payload))
|> map_ok(&Operations.process_execution_payload(&1, block.body))
|> map_ok(&Operations.process_randao(&1, block.body))
|> map_ok(&Operations.process_eth1_data(&1, block.body))
|> block_op(:block_header, &Operations.process_block_header(&1, block))
|> block_op(:withdrawals, &Operations.process_withdrawals(&1, block.body.execution_payload))
|> block_op(:execution_payload, &Operations.process_execution_payload(&1, block.body))
|> block_op(:randao, &Operations.process_randao(&1, block.body))
|> block_op(:eth1_data, &Operations.process_eth1_data(&1, block.body))
|> map_ok(&Operations.process_operations(&1, block.body))
|> map_ok(&Operations.process_sync_aggregate(&1, block.body.sync_aggregate))
|> block_op(
:sync_aggregate,
&Operations.process_sync_aggregate(&1, block.body.sync_aggregate)
)
|> tap(fn _ ->
end_time = System.monotonic_time(:millisecond)
Logger.debug("[Block processing] took #{end_time - start_time} ms")
end)
end

def block_op(state, operation, f), do: apply_op(state, :process_block, operation, f)
def epoch_op(state, operation, f), do: apply_op(state, :epoch, operation, f)

def apply_op(state, transition, operation, f) do
Metrics.span_operation(:on_block, transition, operation, fn -> map_ok(state, f) end)
end
end
5 changes: 4 additions & 1 deletion lib/lambda_ethereum_consensus/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,11 @@ defmodule LambdaEthereumConsensus.Telemetry do
unit: {:native, :millisecond},
tags: [:module, :action]
),

# ForkChoice Metrics
last_value("fork_choice.latency.stop.duration",
unit: {:native, :millisecond},
tags: [:handler, :transition, :operation]
),
last_value("fork_choice.recompute_head.stop.duration",
unit: {:native, :millisecond}
),
Expand Down
Loading

0 comments on commit ea186a2

Please sign in to comment.