From 8c9582a1ad0710743ea3415b1583e6379fde2465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 28 Oct 2024 23:00:46 +0000 Subject: [PATCH] remove manual poll for libp2p Swarm, use tokio::select! instead --- .../lighthouse_network/src/service/mod.rs | 257 +++++++++--------- 1 file changed, 127 insertions(+), 130 deletions(-) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index f3fbd25a90..b23e417adb 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -37,10 +37,7 @@ use slog::{crit, debug, info, o, trace, warn}; use std::num::{NonZeroU8, NonZeroUsize}; use std::path::PathBuf; use std::pin::Pin; -use std::{ - sync::Arc, - task::{Context, Poll}, -}; +use std::sync::Arc; use types::{ consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId, }; @@ -1794,148 +1791,148 @@ impl Network { /* Networking polling */ - /// Poll the p2p networking stack. - /// - /// This will poll the swarm and do maintenance routines. - pub fn poll_network(&mut self, cx: &mut Context) -> Poll> { - while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) { - let maybe_event = match swarm_event { - SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { - // Handle sub-behaviour events. - BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge), - BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re), - // Inform the peer manager about discovered peers. - // - // The peer manager will subsequently decide which peers need to be dialed and then dial - // them. - BehaviourEvent::Discovery(DiscoveredPeers { peers }) => { - self.peer_manager_mut().peers_discovered(peers); - None - } - BehaviourEvent::Identify(ie) => self.inject_identify_event(ie), - BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe), - BehaviourEvent::Upnp(e) => { - self.inject_upnp_event(e); - None + pub async fn next_event(&mut self) -> NetworkEvent { + loop { + tokio::select! { + // Poll the libp2p `Swarm`. + // This will poll the swarm and do maintenance routines. + Some(event) = self.swarm.next() => { + if let Some(event) = self.parse_swarm_event(event) { + return event; } - #[allow(unreachable_patterns)] - BehaviourEvent::ConnectionLimits(le) => void::unreachable(le), }, - SwarmEvent::ConnectionEstablished { .. } => None, - SwarmEvent::ConnectionClosed { .. } => None, - SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - connection_id: _, - } => { - trace!(self.log, "Incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr); - None + + // perform gossipsub score updates when necessary + _ = self.update_gossipsub_scores.tick() => { + let this = self.swarm.behaviour_mut(); + this.peer_manager.update_gossipsub_scores(&this.gossipsub); } - SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - connection_id: _, - } => { - let error_repr = match error { - libp2p::swarm::ListenError::Aborted => { - "Incoming connection aborted".to_string() - } - libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => { - format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}") - } - libp2p::swarm::ListenError::LocalPeerId { endpoint } => { - format!("Dialing local peer id {endpoint:?}") - } - libp2p::swarm::ListenError::Denied { cause } => { - format!("Connection was denied with cause: {cause:?}") + // poll the gossipsub cache to clear expired messages + Some(result) = self.gossip_cache.next() => { + match result { + Err(e) => warn!(self.log, "Gossip cache error"; "error" => e), + Ok(expired_topic) => { + if let Some(v) = metrics::get_int_counter( + &metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND, + &[expired_topic.kind().as_ref()], + ) { + v.inc() + }; } - libp2p::swarm::ListenError::Transport(t) => match t { - libp2p::TransportError::MultiaddrNotSupported(m) => { - format!("Transport error: Multiaddr not supported: {m}") - } - libp2p::TransportError::Other(e) => { - format!("Transport error: other: {e}") - } - }, - }; - debug!(self.log, "Failed incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr, "error" => error_repr); - None + } } - SwarmEvent::OutgoingConnectionError { - peer_id: _, - error: _, - connection_id: _, - } => { - // The Behaviour event is more general than the swarm event here. It includes - // connection failures. So we use that log for now, in the peer manager - // behaviour implementation. + } + } + } + + fn parse_swarm_event( + &mut self, + event: SwarmEvent>, + ) -> Option> { + match event { + SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { + // Handle sub-behaviour events. + BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge), + BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re), + // Inform the peer manager about discovered peers. + // + // The peer manager will subsequently decide which peers need to be dialed and then dial + // them. + BehaviourEvent::Discovery(DiscoveredPeers { peers }) => { + self.peer_manager_mut().peers_discovered(peers); None } - SwarmEvent::NewListenAddr { address, .. } => { - Some(NetworkEvent::NewListenAddr(address)) - } - SwarmEvent::ExpiredListenAddr { address, .. } => { - debug!(self.log, "Listen address expired"; "address" => %address); + BehaviourEvent::Identify(ie) => self.inject_identify_event(ie), + BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe), + BehaviourEvent::Upnp(e) => { + self.inject_upnp_event(e); None } - SwarmEvent::ListenerClosed { - addresses, reason, .. - } => { - match reason { - Ok(_) => { - debug!(self.log, "Listener gracefully closed"; "addresses" => ?addresses) + #[allow(unreachable_patterns)] + BehaviourEvent::ConnectionLimits(le) => void::unreachable(le), + }, + SwarmEvent::ConnectionEstablished { .. } => None, + SwarmEvent::ConnectionClosed { .. } => None, + SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + connection_id: _, + } => { + trace!(self.log, "Incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr); + None + } + SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + connection_id: _, + } => { + let error_repr = match error { + libp2p::swarm::ListenError::Aborted => { + "Incoming connection aborted".to_string() + } + libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => { + format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}") + } + libp2p::swarm::ListenError::LocalPeerId { endpoint } => { + format!("Dialing local peer id {endpoint:?}") + } + libp2p::swarm::ListenError::Denied { cause } => { + format!("Connection was denied with cause: {cause:?}") + } + libp2p::swarm::ListenError::Transport(t) => match t { + libp2p::TransportError::MultiaddrNotSupported(m) => { + format!("Transport error: Multiaddr not supported: {m}") } - Err(reason) => { - crit!(self.log, "Listener abruptly closed"; "addresses" => ?addresses, "reason" => ?reason) + libp2p::TransportError::Other(e) => { + format!("Transport error: other: {e}") } - }; - if Swarm::listeners(&self.swarm).count() == 0 { - Some(NetworkEvent::ZeroListeners) - } else { - None + }, + }; + debug!(self.log, "Failed incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr, "error" => error_repr); + None + } + SwarmEvent::OutgoingConnectionError { + peer_id: _, + error: _, + connection_id: _, + } => { + // The Behaviour event is more general than the swarm event here. It includes + // connection failures. So we use that log for now, in the peer manager + // behaviour implementation. + None + } + SwarmEvent::NewListenAddr { address, .. } => Some(NetworkEvent::NewListenAddr(address)), + SwarmEvent::ExpiredListenAddr { address, .. } => { + debug!(self.log, "Listen address expired"; "address" => %address); + None + } + SwarmEvent::ListenerClosed { + addresses, reason, .. + } => { + match reason { + Ok(_) => { + debug!(self.log, "Listener gracefully closed"; "addresses" => ?addresses) } - } - SwarmEvent::ListenerError { error, .. } => { - debug!(self.log, "Listener closed connection attempt"; "reason" => ?error); - None - } - _ => { - // NOTE: SwarmEvent is a non exhaustive enum so updates should be based on - // release notes more than compiler feedback + Err(reason) => { + crit!(self.log, "Listener abruptly closed"; "addresses" => ?addresses, "reason" => ?reason) + } + }; + if Swarm::listeners(&self.swarm).count() == 0 { + Some(NetworkEvent::ZeroListeners) + } else { None } - }; - - if let Some(ev) = maybe_event { - return Poll::Ready(ev); } - } - - // perform gossipsub score updates when necessary - while self.update_gossipsub_scores.poll_tick(cx).is_ready() { - let this = self.swarm.behaviour_mut(); - this.peer_manager.update_gossipsub_scores(&this.gossipsub); - } - - // poll the gossipsub cache to clear expired messages - while let Poll::Ready(Some(result)) = self.gossip_cache.poll_next_unpin(cx) { - match result { - Err(e) => warn!(self.log, "Gossip cache error"; "error" => e), - Ok(expired_topic) => { - if let Some(v) = metrics::get_int_counter( - &metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND, - &[expired_topic.kind().as_ref()], - ) { - v.inc() - }; - } + SwarmEvent::ListenerError { error, .. } => { + debug!(self.log, "Listener closed connection attempt"; "reason" => ?error); + None + } + _ => { + // NOTE: SwarmEvent is a non exhaustive enum so updates should be based on + // release notes more than compiler feedback + None } } - Poll::Pending - } - - pub async fn next_event(&mut self) -> NetworkEvent { - futures::future::poll_fn(|cx| self.poll_network(cx)).await } }