From 17711b720e22adce6f7afb1ba610d84860a9e6f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Fri, 11 Oct 2024 17:33:49 +0100 Subject: [PATCH] Remove duplicated connection limits checks (#6156) * move main Behaviour to mod.rs for better readibility and remove connection limits checks after connection has been established, as those checks have already been done by connection limits Behaviour. * improve logging wording wrt dial logic when we call dial_peer we are not yet dialing but just adding the peer to the dial queue * do not use a constant for MAX_CONNECTIONS_PER_PEER we only use it at one place, and the function call is explicit. * address review and re-instate connection limits checks, but do it before the connection has been established. * Merge branch 'unstable' of github.com:sigp/lighthouse into remove-dial-error-denied * Merge branch 'unstable' of github.com:sigp/lighthouse into remove-dial-error-denied --- .../src/peer_manager/mod.rs | 18 +--- .../src/peer_manager/network_behaviour.rs | 100 ++++++++---------- .../src/service/behaviour.rs | 39 ------- .../lighthouse_network/src/service/mod.rs | 48 +++++++-- .../lighthouse_network/src/service/utils.rs | 2 - 5 files changed, 90 insertions(+), 117 deletions(-) delete mode 100644 beacon_node/lighthouse_network/src/service/behaviour.rs diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 1f066e9bbcb..ec4d892c9b0 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -338,15 +338,15 @@ impl PeerManager { { // This should be updated with the peer dialing. In fact created once the peer is // dialed + let peer_id = enr.peer_id(); if let Some(min_ttl) = min_ttl { self.network_globals .peers .write() - .update_min_ttl(&enr.peer_id(), min_ttl); + .update_min_ttl(&peer_id, min_ttl); } - let peer_id = enr.peer_id(); if self.dial_peer(enr) { - debug!(self.log, "Dialing discovered peer"; "peer_id" => %peer_id); + debug!(self.log, "Added discovered ENR peer to dial queue"; "peer_id" => %peer_id); to_dial_peers += 1; } } @@ -447,18 +447,6 @@ impl PeerManager { self.network_globals.peers.read().is_connected(peer_id) } - /// Reports whether the peer limit is reached in which case we stop allowing new incoming - /// connections. - pub fn peer_limit_reached(&self, count_dialing: bool) -> bool { - if count_dialing { - // This is an incoming connection so limit by the standard max peers - self.network_globals.connected_or_dialing_peers() >= self.max_peers() - } else { - // We dialed this peer, allow up to max_outbound_dialing_peers - self.network_globals.connected_peers() >= self.max_outbound_dialing_peers() - } - } - /// Updates `PeerInfo` with `identify` information. pub fn identify(&mut self, peer_id: &PeerId, info: &IdentifyInfo) { if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index b7fd5b5e5d7..c40f78b4b08 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -15,7 +15,6 @@ use slog::{debug, error, trace}; use types::EthSpec; use crate::discovery::enr_ext::EnrExt; -use crate::rpc::GoodbyeReason; use crate::types::SyncState; use crate::{metrics, ClearDialError}; @@ -94,26 +93,20 @@ impl NetworkBehaviour for PeerManager { } if let Some(enr) = self.peers_to_dial.pop() { - let peer_id = enr.peer_id(); - self.inject_peer_connection(&peer_id, ConnectingType::Dialing, Some(enr.clone())); - - let quic_multiaddrs = if self.quic_enabled { - let quic_multiaddrs = enr.multiaddr_quic(); - if !quic_multiaddrs.is_empty() { - debug!(self.log, "Dialing QUIC supported peer"; "peer_id"=> %peer_id, "quic_multiaddrs" => ?quic_multiaddrs); - } - quic_multiaddrs - } else { - Vec::new() - }; + self.inject_peer_connection(&enr.peer_id(), ConnectingType::Dialing, Some(enr.clone())); // Prioritize Quic connections over Tcp ones. - let multiaddrs = quic_multiaddrs - .into_iter() - .chain(enr.multiaddr_tcp()) - .collect(); + let multiaddrs = [ + self.quic_enabled + .then_some(enr.multiaddr_quic()) + .unwrap_or_default(), + enr.multiaddr_tcp(), + ] + .concat(); + + debug!(self.log, "Dialing peer"; "peer_id"=> %enr.peer_id(), "multiaddrs" => ?multiaddrs); return Poll::Ready(ToSwarm::Dial { - opts: DialOpts::peer_id(peer_id) + opts: DialOpts::peer_id(enr.peer_id()) .condition(PeerCondition::Disconnected) .addresses(multiaddrs) .build(), @@ -130,14 +123,7 @@ impl NetworkBehaviour for PeerManager { endpoint, other_established, .. - }) => { - // NOTE: We still need to handle the [`ConnectionEstablished`] because the - // [`NetworkBehaviour::handle_established_inbound_connection`] and - // [`NetworkBehaviour::handle_established_outbound_connection`] are fallible. This - // means another behaviour can kill the connection early, and we can't assume a - // peer as connected until this event is received. - self.on_connection_established(peer_id, endpoint, other_established) - } + }) => self.on_connection_established(peer_id, endpoint, other_established), FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, endpoint, @@ -206,6 +192,21 @@ impl NetworkBehaviour for PeerManager { "Connection to peer rejected: peer has a bad score", )); } + + // Check the connection limits + if self.network_globals.connected_or_dialing_peers() >= self.max_peers() + && self + .network_globals + .peers + .read() + .peer_info(&peer_id) + .map_or(true, |peer| !peer.has_future_duty()) + { + return Err(ConnectionDenied::new( + "Connection to peer rejected: too many connections", + )); + } + Ok(ConnectionHandler) } @@ -218,13 +219,26 @@ impl NetworkBehaviour for PeerManager { _port_use: PortUse, ) -> Result, libp2p::swarm::ConnectionDenied> { trace!(self.log, "Outbound connection"; "peer_id" => %peer_id, "multiaddr" => %addr); - match self.ban_status(&peer_id) { - Some(cause) => { - error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id); - Err(ConnectionDenied::new(cause)) - } - None => Ok(ConnectionHandler), + if let Some(cause) = self.ban_status(&peer_id) { + error!(self.log, "Connected a banned peer. Rejecting connection"; "peer_id" => %peer_id); + return Err(ConnectionDenied::new(cause)); } + + // Check the connection limits + if self.network_globals.connected_peers() >= self.max_outbound_dialing_peers() + && self + .network_globals + .peers + .read() + .peer_info(&peer_id) + .map_or(true, |peer| !peer.has_future_duty()) + { + return Err(ConnectionDenied::new( + "Connection to peer rejected: too many connections", + )); + } + + Ok(ConnectionHandler) } } @@ -233,7 +247,7 @@ impl PeerManager { &mut self, peer_id: PeerId, endpoint: &ConnectedPoint, - other_established: usize, + _other_established: usize, ) { debug!(self.log, "Connection established"; "peer_id" => %peer_id, "multiaddr" => %endpoint.get_remote_address(), @@ -247,26 +261,6 @@ impl PeerManager { self.update_peer_count_metrics(); } - // Count dialing peers in the limit if the peer dialed us. - let count_dialing = endpoint.is_listener(); - // Check the connection limits - if self.peer_limit_reached(count_dialing) - && self - .network_globals - .peers - .read() - .peer_info(&peer_id) - .map_or(true, |peer| !peer.has_future_duty()) - { - // Gracefully disconnect the peer. - self.disconnect_peer(peer_id, GoodbyeReason::TooManyPeers); - return; - } - - if other_established == 0 { - self.events.push(PeerManagerEvent::MetaData(peer_id)); - } - // NOTE: We don't register peers that we are disconnecting immediately. The network service // does not need to know about these peers. match endpoint { diff --git a/beacon_node/lighthouse_network/src/service/behaviour.rs b/beacon_node/lighthouse_network/src/service/behaviour.rs deleted file mode 100644 index ab2e43630bb..00000000000 --- a/beacon_node/lighthouse_network/src/service/behaviour.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::discovery::Discovery; -use crate::peer_manager::PeerManager; -use crate::rpc::RPC; -use crate::types::SnappyTransform; - -use libp2p::identify; -use libp2p::swarm::behaviour::toggle::Toggle; -use libp2p::swarm::NetworkBehaviour; -use libp2p::upnp::tokio::Behaviour as Upnp; -use types::EthSpec; - -use super::api_types::RequestId; - -pub type SubscriptionFilter = - gossipsub::MaxCountSubscriptionFilter; -pub type Gossipsub = gossipsub::Behaviour; - -#[derive(NetworkBehaviour)] -pub(crate) struct Behaviour -where - E: EthSpec, -{ - /// Keep track of active and pending connections to enforce hard limits. - pub connection_limits: libp2p::connection_limits::Behaviour, - /// The peer manager that keeps track of peer's reputation and status. - pub peer_manager: PeerManager, - /// The Eth2 RPC specified in the wire-0 protocol. - pub eth2_rpc: RPC, - /// Discv5 Discovery protocol. - pub discovery: Discovery, - /// Keep regular connection to peers and disconnect if absent. - // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. - /// Provides IP addresses and peer information. - pub identify: identify::Behaviour, - /// Libp2p UPnP port mapping. - pub upnp: Toggle, - /// The routing pub-sub mechanism for eth2. - pub gossipsub: Gossipsub, -} diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 4cf59e15e10..ea4c3acb421 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1,4 +1,3 @@ -use self::behaviour::Behaviour; use self::gossip_cache::GossipCache; use crate::config::{gossipsub_config, GossipsubConfigParams, NetworkLoad}; use crate::discovery::{ @@ -14,8 +13,6 @@ use crate::rpc::{ self, GoodbyeReason, HandlerErr, NetworkParams, Protocol, RPCError, RPCMessage, RPCReceived, RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC, }; -use crate::service::behaviour::BehaviourEvent; -pub use crate::service::behaviour::Gossipsub; use crate::types::{ attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, @@ -33,7 +30,8 @@ use gossipsub::{ use gossipsub_scoring_parameters::{lighthouse_gossip_thresholds, PeerScoreSettings}; use libp2p::multiaddr::{self, Multiaddr, Protocol as MProtocol}; use libp2p::swarm::behaviour::toggle::Toggle; -use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::upnp::tokio::Behaviour as Upnp; use libp2p::{identify, PeerId, SwarmBuilder}; use slog::{crit, debug, info, o, trace, warn}; use std::num::{NonZeroU8, NonZeroUsize}; @@ -47,10 +45,9 @@ use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; use types::{ChainSpec, ForkName}; -use utils::{build_transport, strip_peer_id, Context as ServiceContext, MAX_CONNECTIONS_PER_PEER}; +use utils::{build_transport, strip_peer_id, Context as ServiceContext}; pub mod api_types; -mod behaviour; mod gossip_cache; pub mod gossipsub_scoring_parameters; pub mod utils; @@ -109,6 +106,41 @@ pub enum NetworkEvent { ZeroListeners, } +pub type Gossipsub = gossipsub::Behaviour; +pub type SubscriptionFilter = + gossipsub::MaxCountSubscriptionFilter; + +#[derive(NetworkBehaviour)] +pub(crate) struct Behaviour +where + E: EthSpec, +{ + // NOTE: The order of the following list of behaviours has meaning, + // `NetworkBehaviour::handle_{pending, established}_{inbound, outbound}` methods + // are called sequentially for each behaviour and they are fallible, + // therefore we want `connection_limits` and `peer_manager` running first, + // which are the behaviours that may reject a connection, so that + // when the subsequent behaviours are called they are certain the connection won't be rejected. + + // + /// Keep track of active and pending connections to enforce hard limits. + pub connection_limits: libp2p::connection_limits::Behaviour, + /// The peer manager that keeps track of peer's reputation and status. + pub peer_manager: PeerManager, + /// The Eth2 RPC specified in the wire-0 protocol. + pub eth2_rpc: RPC, + /// Discv5 Discovery protocol. + pub discovery: Discovery, + /// Keep regular connection to peers and disconnect if absent. + // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. + /// Provides IP addresses and peer information. + pub identify: identify::Behaviour, + /// Libp2p UPnP port mapping. + pub upnp: Toggle, + /// The routing pub-sub mechanism for eth2. + pub gossipsub: Gossipsub, +} + /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. @@ -396,7 +428,7 @@ impl Network { (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS)) .ceil() as u32, )) - .with_max_established_per_peer(Some(MAX_CONNECTIONS_PER_PEER)); + .with_max_established_per_peer(Some(1)); libp2p::connection_limits::Behaviour::new(limits) }; @@ -1198,7 +1230,7 @@ impl Network { self.discovery_mut().remove_cached_enr(&enr.peer_id()); let peer_id = enr.peer_id(); if self.peer_manager_mut().dial_peer(enr) { - debug!(self.log, "Dialing cached ENR peer"; "peer_id" => %peer_id); + debug!(self.log, "Added cached ENR peer to dial queue"; "peer_id" => %peer_id); } } } diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 81ee86b8b9b..f4988e68cd5 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -24,8 +24,6 @@ use types::{ }; pub const NETWORK_KEY_FILENAME: &str = "key"; -/// The maximum simultaneous libp2p connections per peer. -pub const MAX_CONNECTIONS_PER_PEER: u32 = 1; /// The filename to store our local metadata. pub const METADATA_FILENAME: &str = "metadata";