Skip to content

Commit

Permalink
feat: add metrics to main libp2pport handlers (#1226)
Browse files Browse the repository at this point in the history
  • Loading branch information
avilagaston9 authored Jul 26, 2024
1 parent e8a622d commit 111036d
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 338 deletions.
38 changes: 22 additions & 16 deletions lib/lambda_ethereum_consensus/metrics.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,22 +88,6 @@ defmodule LambdaEthereumConsensus.Metrics do
block_status_execute(root, new_status, slot, 1)
end

defp block_status_execute(root, status, slot, value) do
hex_root = Base.encode16(root)

Logger.debug(
"[Metrics] slot = #{inspect(slot)}, status = #{inspect(status)}, value = #{inspect(value)}"
)

:telemetry.execute([:blocks, :status], %{total: value}, %{
id: hex_root,
mainstat: status,
color: map_color(status),
title: slot,
detail__root: hex_root
})
end

def block_relationship(nil, _), do: :ok

def block_relationship(parent_root, root) do
Expand All @@ -126,6 +110,28 @@ defmodule LambdaEthereumConsensus.Metrics do
end)
end

def handler_span(module, action, f) do
:telemetry.span([:libp2pport, :handler], %{}, fn ->
{f.(), %{module: module, action: action}}
end)
end

defp block_status_execute(root, status, slot, value) do
hex_root = Base.encode16(root)

Logger.debug(
"[Metrics] slot = #{inspect(slot)}, status = #{inspect(status)}, value = #{inspect(value)}"
)

:telemetry.execute([:blocks, :status], %{total: value}, %{
id: hex_root,
mainstat: status,
color: map_color(status),
title: slot,
detail__root: hex_root
})
end

defp map_color(:transitioned), do: "blue"
defp map_color(:pending), do: "green"
defp map_color(:download_blobs), do: "yellow"
Expand Down
15 changes: 13 additions & 2 deletions lib/lambda_ethereum_consensus/p2p/blob_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do
require Logger

alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P
alias LambdaEthereumConsensus.P2P.ReqResp
alias Types.BlobSidecar
Expand Down Expand Up @@ -38,7 +39,13 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do
|> ReqResp.encode_request()

Libp2pPort.send_async_request(peer_id, @blobs_by_range_protocol_id, request, fn response ->
handle_blobs_by_range_response(response, peer_id, count, slot, retries, on_blobs)
Metrics.handler_span(
"response_handler",
"blob_sidecars_by_range",
fn ->
handle_blobs_by_range_response(response, peer_id, count, slot, retries, on_blobs)
end
)
end)
end

Expand Down Expand Up @@ -85,7 +92,11 @@ defmodule LambdaEthereumConsensus.P2P.BlobDownloader do
request = ReqResp.encode_request({identifiers, TypeAliases.blob_sidecars_by_root_request()})

Libp2pPort.send_async_request(peer_id, @blobs_by_root_protocol_id, request, fn response ->
handle_blobs_by_root(response, peer_id, identifiers, retries, on_blobs)
Metrics.handler_span(
"response_handler",
"blob_sidecars_by_root",
fn -> handle_blobs_by_root(response, peer_id, identifiers, retries, on_blobs) end
)
end)
end

Expand Down
15 changes: 13 additions & 2 deletions lib/lambda_ethereum_consensus/p2p/block_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
require Logger

alias LambdaEthereumConsensus.Libp2pPort
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P
alias LambdaEthereumConsensus.P2P.ReqResp
alias Types.SignedBeaconBlock
Expand Down Expand Up @@ -65,7 +66,13 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
|> ReqResp.encode_request()

Libp2pPort.send_async_request(peer_id, @blocks_by_range_protocol_id, request, fn response ->
handle_blocks_by_range_response(response, slot, count, retries, peer_id, on_blocks)
Metrics.handler_span(
"response_handler",
"blocks_by_range",
fn ->
handle_blocks_by_range_response(response, slot, count, retries, peer_id, on_blocks)
end
)
end)
end

Expand Down Expand Up @@ -127,7 +134,11 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
request = ReqResp.encode_request({roots, TypeAliases.beacon_blocks_by_root_request()})

Libp2pPort.send_async_request(peer_id, @blocks_by_root_protocol_id, request, fn response ->
handle_blocks_by_root_response(response, roots, on_blocks, peer_id, retries)
Metrics.handler_span(
"response_handler",
"blocks_by_root",
fn -> handle_blocks_by_root_response(response, roots, on_blocks, peer_id, retries) end
)
end)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequestsHandler do
"""

alias LambdaEthereumConsensus.ForkChoice
alias LambdaEthereumConsensus.Metrics
alias LambdaEthereumConsensus.P2P.Metadata
alias LambdaEthereumConsensus.P2P.ReqResp
alias LambdaEthereumConsensus.Store.BlockDb
Expand Down Expand Up @@ -31,8 +32,8 @@ defmodule LambdaEthereumConsensus.P2P.IncomingRequestsHandler do
Logger.debug("'#{name}' request received")

result =
:telemetry.span([:port, :request], %{}, fn ->
{handle_req(name, message_id, message), %{module: "handler", request: inspect(name)}}
Metrics.handler_span("request_handler", name |> String.split("/") |> List.first(), fn ->
handle_req(name, message_id, message)
end)

case result do
Expand Down
20 changes: 12 additions & 8 deletions lib/lambda_ethereum_consensus/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,6 @@ defmodule LambdaEthereumConsensus.Telemetry do
counter("network.pubsub_topics_un_deliverable_message.count", tags: [:topic]),
counter("network.pubsub_topics_validate_message.count", tags: [:topic]),
counter("port.message.count", tags: [:function, :direction]),
last_value("port.request.stop.duration",
unit: {:native, :millisecond},
tags: [:module, :request]
),
last_value("port.request.exception.duration",
unit: {:native, :millisecond},
tags: [:module, :request]
),
sum("network.request.blocks", tags: [:result, :type, :reason]),

# Sync metrics
Expand Down Expand Up @@ -140,6 +132,18 @@ defmodule LambdaEthereumConsensus.Telemetry do
tags: [:module, :action]
),
counter("db.latency.stop.count", unit: {:native, :millisecond}, tags: [:module, :action]),
last_value("libp2pport.handler.stop.duration",
unit: {:native, :millisecond},
tags: [:module, :action]
),
last_value("libp2pport.handler.exception.duration",
unit: {:native, :millisecond},
tags: [:module, :action]
),
counter("libp2pport.handler.stop.count",
unit: {:native, :millisecond},
tags: [:module, :action]
),
last_value("fork_choice.latency.stop.duration",
unit: {:native, :millisecond},
tags: [:handler, :transition, :operation]
Expand Down
9 changes: 7 additions & 2 deletions lib/libp2p_port.ex
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,13 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
})

case Map.fetch(subscribers, gs.topic) do
{:ok, module} -> module.handle_gossip_message(gs.topic, gs.msg_id, gs.message)
:error -> Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.")
{:ok, module} ->
Metrics.handler_span("gossip_handler", gs.topic, fn ->
module.handle_gossip_message(gs.topic, gs.msg_id, gs.message)
end)

:error ->
Logger.error("[Gossip] Received gossip from unknown topic: #{gs.topic}.")
end

state
Expand Down
Loading

0 comments on commit 111036d

Please sign in to comment.