Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge queue: embarking unstable (eab3672) and #5404 together #5433

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 68 additions & 16 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ pub use libp2p::identity::Keypair;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod peerdb;

use crate::peer_manager::peerdb::client::ClientKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{
ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo,
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;
use strum::IntoEnumIterator;

pub mod config;
mod network_behaviour;

Expand Down Expand Up @@ -464,19 +468,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
"observed_address" => ?info.observed_addr,
"protocols" => ?info.protocols
);

// update the peer client kind metric if the peer is connected
if matches!(
peer_info.connection_status(),
PeerConnectionStatus::Connected { .. }
| PeerConnectionStatus::Disconnecting { .. }
) {
metrics::inc_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[peer_info.client().kind.as_ref()],
);
metrics::dec_gauge_vec(&metrics::PEERS_PER_CLIENT, &[previous_kind.as_ref()]);
}
}
} else {
error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string());
Expand Down Expand Up @@ -812,11 +803,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// start a ping and status timer for the peer
self.status_peers.insert(*peer_id);

let connected_peers = self.network_globals.connected_peers() as i64;

// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);

true
}
Expand Down Expand Up @@ -1267,6 +1255,70 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
);
}
}

// Update peer count related metrics.
fn update_peer_count_metrics(&self) {
let mut peers_connected = 0;
let mut clients_per_peer = HashMap::new();
let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new();

for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
peers_connected += 1;

*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;

let direction = match peer_info.connection_direction() {
Some(ConnectionDirection::Incoming) => "inbound",
Some(ConnectionDirection::Outgoing) => "outbound",
None => "none",
};
// Note: the `transport` is set to `unknown` if the `listening_addresses` list is empty.
// This situation occurs when the peer is initially registered in PeerDB, but the peer
// info has not yet been updated at `PeerManager::identify`.
let transport = peer_info
.listening_addresses()
.iter()
.find_map(|addr| {
addr.iter().find_map(|proto| match proto {
multiaddr::Protocol::QuicV1 => Some("quic"),
multiaddr::Protocol::Tcp(_) => Some("tcp"),
_ => None,
})
})
.unwrap_or("unknown");
*peers_connected_mutli
.entry((direction, transport))
.or_default() += 1;
}

// PEERS_CONNECTED
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);

// PEERS_PER_CLIENT
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[client_kind.as_ref()],
*value as i64,
);
}

// PEERS_CONNECTED_MULTI
for direction in ["inbound", "outbound", "none"] {
for transport in ["quic", "tcp", "unknown"] {
metrics::set_gauge_vec(
&metrics::PEERS_CONNECTED_MULTI,
&[direction, transport],
*peers_connected_mutli
.get(&(direction, transport))
.unwrap_or(&0) as i64,
);
}
}
}
}

enum ConnectingType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::IpAddr;
use std::task::{Context, Poll};

use futures::StreamExt;
use libp2p::core::{multiaddr, ConnectedPoint};
use libp2p::core::ConnectedPoint;
use libp2p::identity::PeerId;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
Expand Down Expand Up @@ -243,35 +243,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}

// increment prometheus metrics
// Update the prometheus metrics
if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr)
}
};

metrics::inc_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);

self.update_peer_count_metrics();
}

// Count dialing peers in the limit if the peer dialed us.
Expand Down Expand Up @@ -309,7 +285,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
fn on_connection_closed(
&mut self,
peer_id: PeerId,
endpoint: &ConnectedPoint,
_endpoint: &ConnectedPoint,
remaining_established: usize,
) {
if remaining_established > 0 {
Expand Down Expand Up @@ -337,33 +313,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// reference so that peer manager can track this peer.
self.inject_disconnect(&peer_id);

let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}
};
// Legacy standard metrics.
metrics::dec_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);

self.update_peer_count_metrics();
}
}

Expand Down
Loading