Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fork choice metrics #1232

Merged
merged 2 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,13 @@ defmodule LambdaEthereumConsensus.ForkChoice do
end
end

@spec apply_handler(any(), any(), any()) :: any()
defp apply_handler(iter, state, handler) do
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
_, {:error, _} = err -> {:halt, err}
defp apply_handler(iter, name, state, handler) do
Metrics.span_operation(name, nil, nil, fn ->
iter
|> Enum.reduce_while({:ok, state}, fn
x, {:ok, st} -> {:cont, handler.(st, x)}
_, {:error, _} = err -> {:halt, err}
end)
end)
end

Expand All @@ -212,11 +213,11 @@ defmodule LambdaEthereumConsensus.ForkChoice do
# process block attestations
{:ok, new_store} <-
signed_block.message.body.attestations
|> apply_handler(new_store, &Handlers.on_attestation(&1, &2, true)),
|> apply_handler(:attestations, new_store, &Handlers.on_attestation(&1, &2, true)),
# process block attester slashings
{:ok, new_store} <-
signed_block.message.body.attester_slashings
|> apply_handler(new_store, &Handlers.on_attester_slashing/2) do
|> apply_handler(:attester_slashings, new_store, &Handlers.on_attester_slashing/2) do
Handlers.prune_checkpoint_states(new_store)
{:ok, new_store}
end
Expand Down
6 changes: 6 additions & 0 deletions lib/lambda_ethereum_consensus/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ defmodule LambdaEthereumConsensus.Metrics do
end
end

def span_operation(handler, transition, operation, f) do
:telemetry.span([:fork_choice, :latency], %{}, fn ->
{f.(), %{handler: handler, transition: transition, operation: operation}}
end)
end

defp map_color(:transitioned), do: "blue"
defp map_color(:pending), do: "green"
defp map_color(:download_blobs), do: "yellow"
Expand Down
33 changes: 23 additions & 10 deletions lib/lambda_ethereum_consensus/state_transition/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do
This module contains functions for handling state transition
"""

alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.StateTransition.Accessors
alias LambdaEthereumConsensus.StateTransition.Math
alias LambdaEthereumConsensus.StateTransition.Misc
Expand Down Expand Up @@ -919,19 +920,31 @@ defmodule LambdaEthereumConsensus.StateTransition.Operations do
# Ensure that outstanding deposits are processed up to the maximum number of deposits
with :ok <- verify_deposits(state, body) do
{:ok, state}
|> for_ops(body.proposer_slashings, &process_proposer_slashing/2)
|> for_ops(body.attester_slashings, &process_attester_slashing/2)
|> Utils.map_ok(&process_attestation_batch(&1, body.attestations))
|> for_ops(body.deposits, &process_deposit/2)
|> for_ops(body.voluntary_exits, &process_voluntary_exit/2)
|> for_ops(body.bls_to_execution_changes, &process_bls_to_execution_change/2)
|> for_ops(:proposer_slashing, body.proposer_slashings, &process_proposer_slashing/2)
|> for_ops(:attester_slashing, body.attester_slashings, &process_attester_slashing/2)
|> apply_op(:attestation_batch, &process_attestation_batch(&1, body.attestations))
|> for_ops(:deposit, body.deposits, &process_deposit/2)
|> for_ops(:voluntary_exit, body.voluntary_exits, &process_voluntary_exit/2)
|> for_ops(
:bls_to_execution_change,
body.bls_to_execution_changes,
&process_bls_to_execution_change/2
)
end
end

defp for_ops(acc, operations, func) do
Enum.reduce_while(operations, acc, fn
operation, {:ok, state} -> {:cont, func.(state, operation)}
_, {:error, reason} -> {:halt, {:error, reason}}
defp apply_op(acc, op_name, func) do
Metrics.span_operation(:on_block, :process_block_operations, op_name, fn ->
Utils.map_ok(acc, func)
end)
end

defp for_ops(acc, op_name, operations, func) do
Metrics.span_operation(:on_block, :process_block_operations, op_name, fn ->
Enum.reduce_while(operations, acc, fn
operation, {:ok, state} -> {:cont, func.(state, operation)}
_, {:error, reason} -> {:halt, {:error, reason}}
end)
end)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.StateTransition do
"""

require Logger
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.StateTransition.Accessors
alias LambdaEthereumConsensus.StateTransition.EpochProcessing
alias LambdaEthereumConsensus.StateTransition.Misc
Expand Down Expand Up @@ -108,17 +109,23 @@ defmodule LambdaEthereumConsensus.StateTransition do

state
|> EpochProcessing.process_justification_and_finalization()
|> map_ok(&EpochProcessing.process_inactivity_updates/1)
|> map_ok(&EpochProcessing.process_rewards_and_penalties/1)
|> map_ok(&EpochProcessing.process_registry_updates/1)
|> map_ok(&EpochProcessing.process_slashings/1)
|> map_ok(&EpochProcessing.process_eth1_data_reset/1)
|> map_ok(&EpochProcessing.process_effective_balance_updates/1)
|> map_ok(&EpochProcessing.process_slashings_reset/1)
|> map_ok(&EpochProcessing.process_randao_mixes_reset/1)
|> map_ok(&EpochProcessing.process_historical_summaries_update/1)
|> map_ok(&EpochProcessing.process_participation_flag_updates/1)
|> map_ok(&EpochProcessing.process_sync_committee_updates/1)
|> epoch_op(:inactivity_updates, &EpochProcessing.process_inactivity_updates/1)
|> epoch_op(:rewards_and_penalties, &EpochProcessing.process_rewards_and_penalties/1)
|> epoch_op(:registry_updates, &EpochProcessing.process_registry_updates/1)
|> epoch_op(:slashings, &EpochProcessing.process_slashings/1)
|> epoch_op(:eth1_data_reset, &EpochProcessing.process_eth1_data_reset/1)
|> epoch_op(:effective_balance_updates, &EpochProcessing.process_effective_balance_updates/1)
|> epoch_op(:slashings_reset, &EpochProcessing.process_slashings_reset/1)
|> epoch_op(:randao_mixes_reset, &EpochProcessing.process_randao_mixes_reset/1)
|> epoch_op(
:historical_summaries_update,
&EpochProcessing.process_historical_summaries_update/1
)
|> epoch_op(
:participation_flag_updates,
&EpochProcessing.process_participation_flag_updates/1
)
|> epoch_op(:sync_committee_updates, &EpochProcessing.process_sync_committee_updates/1)
|> tap(fn _ ->
end_time = System.monotonic_time(:millisecond)
Logger.debug("[Epoch processing] took #{end_time - start_time} ms")
Expand All @@ -136,16 +143,26 @@ defmodule LambdaEthereumConsensus.StateTransition do
start_time = System.monotonic_time(:millisecond)

{:ok, state}
|> map_ok(&Operations.process_block_header(&1, block))
|> map_ok(&Operations.process_withdrawals(&1, block.body.execution_payload))
|> map_ok(&Operations.process_execution_payload(&1, block.body))
|> map_ok(&Operations.process_randao(&1, block.body))
|> map_ok(&Operations.process_eth1_data(&1, block.body))
|> block_op(:block_header, &Operations.process_block_header(&1, block))
|> block_op(:withdrawals, &Operations.process_withdrawals(&1, block.body.execution_payload))
|> block_op(:execution_payload, &Operations.process_execution_payload(&1, block.body))
|> block_op(:randao, &Operations.process_randao(&1, block.body))
|> block_op(:eth1_data, &Operations.process_eth1_data(&1, block.body))
|> map_ok(&Operations.process_operations(&1, block.body))
|> map_ok(&Operations.process_sync_aggregate(&1, block.body.sync_aggregate))
|> block_op(
:sync_aggregate,
&Operations.process_sync_aggregate(&1, block.body.sync_aggregate)
)
|> tap(fn _ ->
end_time = System.monotonic_time(:millisecond)
Logger.debug("[Block processing] took #{end_time - start_time} ms")
end)
end

def block_op(state, operation, f), do: apply_op(state, :process_block, operation, f)
def epoch_op(state, operation, f), do: apply_op(state, :epoch, operation, f)

def apply_op(state, transition, operation, f) do
Metrics.span_operation(:on_block, transition, operation, fn -> map_ok(state, f) end)
end
end
4 changes: 4 additions & 0 deletions lib/lambda_ethereum_consensus/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ defmodule LambdaEthereumConsensus.Telemetry do
tags: [:module, :action]
),
counter("db.latency.stop.count", unit: {:native, :millisecond}, tags: [:module, :action]),
last_value("fork_choice.latency.stop.duration",
unit: {:native, :millisecond},
tags: [:handler, :transition, :operation]
),

# ForkChoice Metrics
last_value("fork_choice.recompute_head.stop.duration",
Expand Down
Loading
Loading