From 7117772fb358abe6b2ef7924be4ec020acb2a54f Mon Sep 17 00:00:00 2001 From: Akihito Nakano Date: Wed, 20 Mar 2024 15:45:16 +0900 Subject: [PATCH] Fix peer count metrics (#5404) * Set the peers_per_client metrics directly, rather than using increment/decrement * Move PEERS_CONNECTED related update to the same place * Move PEERS_CONNECTED_MULTI related update to the same place * Rename * Remove unused variables --- .../src/peer_manager/mod.rs | 84 +++++++++++++++---- .../src/peer_manager/network_behaviour.rs | 59 ++----------- 2 files changed, 75 insertions(+), 68 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 92f876ee032..8e5290630a5 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -26,6 +26,8 @@ pub use libp2p::identity::Keypair; #[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy pub mod peerdb; +use crate::peer_manager::peerdb::client::ClientKind; +use libp2p::multiaddr; pub use peerdb::peer_info::{ ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo, }; @@ -33,6 +35,8 @@ use peerdb::score::{PeerAction, ReportSource}; pub use peerdb::sync_status::{SyncInfo, SyncStatus}; use std::collections::{hash_map::Entry, HashMap}; use std::net::IpAddr; +use strum::IntoEnumIterator; + pub mod config; mod network_behaviour; @@ -464,19 +468,6 @@ impl PeerManager { "observed_address" => ?info.observed_addr, "protocols" => ?info.protocols ); - - // update the peer client kind metric if the peer is connected - if matches!( - peer_info.connection_status(), - PeerConnectionStatus::Connected { .. } - | PeerConnectionStatus::Disconnecting { .. } - ) { - metrics::inc_gauge_vec( - &metrics::PEERS_PER_CLIENT, - &[peer_info.client().kind.as_ref()], - ); - metrics::dec_gauge_vec(&metrics::PEERS_PER_CLIENT, &[previous_kind.as_ref()]); - } } } else { error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string()); @@ -812,11 +803,8 @@ impl PeerManager { // start a ping and status timer for the peer self.status_peers.insert(*peer_id); - let connected_peers = self.network_globals.connected_peers() as i64; - // increment prometheus metrics metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); - metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers); true } @@ -1267,6 +1255,70 @@ impl PeerManager { ); } } + + // Update peer count related metrics. + fn update_peer_count_metrics(&self) { + let mut peers_connected = 0; + let mut clients_per_peer = HashMap::new(); + let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new(); + + for (_, peer_info) in self.network_globals.peers.read().connected_peers() { + peers_connected += 1; + + *clients_per_peer + .entry(peer_info.client().kind.to_string()) + .or_default() += 1; + + let direction = match peer_info.connection_direction() { + Some(ConnectionDirection::Incoming) => "inbound", + Some(ConnectionDirection::Outgoing) => "outbound", + None => "none", + }; + // Note: the `transport` is set to `unknown` if the `listening_addresses` list is empty. + // This situation occurs when the peer is initially registered in PeerDB, but the peer + // info has not yet been updated at `PeerManager::identify`. + let transport = peer_info + .listening_addresses() + .iter() + .find_map(|addr| { + addr.iter().find_map(|proto| match proto { + multiaddr::Protocol::QuicV1 => Some("quic"), + multiaddr::Protocol::Tcp(_) => Some("tcp"), + _ => None, + }) + }) + .unwrap_or("unknown"); + *peers_connected_mutli + .entry((direction, transport)) + .or_default() += 1; + } + + // PEERS_CONNECTED + metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected); + + // PEERS_PER_CLIENT + for client_kind in ClientKind::iter() { + let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0); + metrics::set_gauge_vec( + &metrics::PEERS_PER_CLIENT, + &[client_kind.as_ref()], + *value as i64, + ); + } + + // PEERS_CONNECTED_MULTI + for direction in ["inbound", "outbound", "none"] { + for transport in ["quic", "tcp", "unknown"] { + metrics::set_gauge_vec( + &metrics::PEERS_CONNECTED_MULTI, + &[direction, transport], + *peers_connected_mutli + .get(&(direction, transport)) + .unwrap_or(&0) as i64, + ); + } + } + } } enum ConnectingType { diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index 5dda78a0135..8e105c0ccab 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -4,7 +4,7 @@ use std::net::IpAddr; use std::task::{Context, Poll}; use futures::StreamExt; -use libp2p::core::{multiaddr, ConnectedPoint}; +use libp2p::core::ConnectedPoint; use libp2p::identity::PeerId; use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; @@ -243,35 +243,11 @@ impl PeerManager { self.events.push(PeerManagerEvent::MetaData(peer_id)); } - // increment prometheus metrics + // Update the prometheus metrics if self.metrics_enabled { - let remote_addr = endpoint.get_remote_address(); - let direction = if endpoint.is_dialer() { - "outbound" - } else { - "inbound" - }; - - match remote_addr.iter().find(|proto| { - matches!( - proto, - multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_) - ) - }) { - Some(multiaddr::Protocol::QuicV1) => { - metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]); - } - Some(multiaddr::Protocol::Tcp(_)) => { - metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]); - } - Some(_) => unreachable!(), - None => { - error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr) - } - }; - - metrics::inc_gauge(&metrics::PEERS_CONNECTED); metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT); + + self.update_peer_count_metrics(); } // Count dialing peers in the limit if the peer dialed us. @@ -309,7 +285,7 @@ impl PeerManager { fn on_connection_closed( &mut self, peer_id: PeerId, - endpoint: &ConnectedPoint, + _endpoint: &ConnectedPoint, remaining_established: usize, ) { if remaining_established > 0 { @@ -337,33 +313,12 @@ impl PeerManager { // reference so that peer manager can track this peer. self.inject_disconnect(&peer_id); - let remote_addr = endpoint.get_remote_address(); // Update the prometheus metrics if self.metrics_enabled { - let direction = if endpoint.is_dialer() { - "outbound" - } else { - "inbound" - }; - - match remote_addr.iter().find(|proto| { - matches!( - proto, - multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_) - ) - }) { - Some(multiaddr::Protocol::QuicV1) => { - metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]); - } - Some(multiaddr::Protocol::Tcp(_)) => { - metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]); - } - // If it's an unknown protocol we already logged when connection was established. - _ => {} - }; // Legacy standard metrics. - metrics::dec_gauge(&metrics::PEERS_CONNECTED); metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT); + + self.update_peer_count_metrics(); } }