Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove manual poll of the libp2p Swarm #6550

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 127 additions & 130 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ use slog::{crit, debug, info, o, trace, warn};
use std::num::{NonZeroU8, NonZeroUsize};
use std::path::PathBuf;
use std::pin::Pin;
use std::{
sync::Arc,
task::{Context, Poll},
};
use std::sync::Arc;
use types::{
consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, EnrForkId, EthSpec, ForkContext, Slot, SubnetId,
};
Expand Down Expand Up @@ -1794,148 +1791,148 @@ impl<E: EthSpec> Network<E> {

/* Networking polling */

/// Poll the p2p networking stack.
///
/// This will poll the swarm and do maintenance routines.
pub fn poll_network(&mut self, cx: &mut Context) -> Poll<NetworkEvent<E>> {
while let Poll::Ready(Some(swarm_event)) = self.swarm.poll_next_unpin(cx) {
let maybe_event = match swarm_event {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
// Handle sub-behaviour events.
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
self.peer_manager_mut().peers_discovered(peers);
None
}
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
BehaviourEvent::Upnp(e) => {
self.inject_upnp_event(e);
None
pub async fn next_event(&mut self) -> NetworkEvent<E> {
loop {
tokio::select! {
// Poll the libp2p `Swarm`.
// This will poll the swarm and do maintenance routines.
Some(event) = self.swarm.next() => {
if let Some(event) = self.parse_swarm_event(event) {
return event;
}
#[allow(unreachable_patterns)]
BehaviourEvent::ConnectionLimits(le) => void::unreachable(le),
},
SwarmEvent::ConnectionEstablished { .. } => None,
SwarmEvent::ConnectionClosed { .. } => None,
SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
connection_id: _,
} => {
trace!(self.log, "Incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr);
None

// perform gossipsub score updates when necessary
_ = self.update_gossipsub_scores.tick() => {
let this = self.swarm.behaviour_mut();
this.peer_manager.update_gossipsub_scores(&this.gossipsub);
}
SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
connection_id: _,
} => {
let error_repr = match error {
libp2p::swarm::ListenError::Aborted => {
"Incoming connection aborted".to_string()
}
libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => {
format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}")
}
libp2p::swarm::ListenError::LocalPeerId { endpoint } => {
format!("Dialing local peer id {endpoint:?}")
}
libp2p::swarm::ListenError::Denied { cause } => {
format!("Connection was denied with cause: {cause:?}")
// poll the gossipsub cache to clear expired messages
Some(result) = self.gossip_cache.next() => {
match result {
Err(e) => warn!(self.log, "Gossip cache error"; "error" => e),
Ok(expired_topic) => {
if let Some(v) = metrics::get_int_counter(
&metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND,
&[expired_topic.kind().as_ref()],
) {
v.inc()
};
}
libp2p::swarm::ListenError::Transport(t) => match t {
libp2p::TransportError::MultiaddrNotSupported(m) => {
format!("Transport error: Multiaddr not supported: {m}")
}
libp2p::TransportError::Other(e) => {
format!("Transport error: other: {e}")
}
},
};
debug!(self.log, "Failed incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr, "error" => error_repr);
None
}
}
SwarmEvent::OutgoingConnectionError {
peer_id: _,
error: _,
connection_id: _,
} => {
// The Behaviour event is more general than the swarm event here. It includes
// connection failures. So we use that log for now, in the peer manager
// behaviour implementation.
}
}
}

fn parse_swarm_event(
&mut self,
event: SwarmEvent<BehaviourEvent<E>>,
) -> Option<NetworkEvent<E>> {
match event {
SwarmEvent::Behaviour(behaviour_event) => match behaviour_event {
// Handle sub-behaviour events.
BehaviourEvent::Gossipsub(ge) => self.inject_gs_event(ge),
BehaviourEvent::Eth2Rpc(re) => self.inject_rpc_event(re),
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
BehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
self.peer_manager_mut().peers_discovered(peers);
None
}
SwarmEvent::NewListenAddr { address, .. } => {
Some(NetworkEvent::NewListenAddr(address))
}
SwarmEvent::ExpiredListenAddr { address, .. } => {
debug!(self.log, "Listen address expired"; "address" => %address);
BehaviourEvent::Identify(ie) => self.inject_identify_event(ie),
BehaviourEvent::PeerManager(pe) => self.inject_pm_event(pe),
BehaviourEvent::Upnp(e) => {
self.inject_upnp_event(e);
None
}
SwarmEvent::ListenerClosed {
addresses, reason, ..
} => {
match reason {
Ok(_) => {
debug!(self.log, "Listener gracefully closed"; "addresses" => ?addresses)
#[allow(unreachable_patterns)]
BehaviourEvent::ConnectionLimits(le) => void::unreachable(le),
},
SwarmEvent::ConnectionEstablished { .. } => None,
SwarmEvent::ConnectionClosed { .. } => None,
SwarmEvent::IncomingConnection {
local_addr,
send_back_addr,
connection_id: _,
} => {
trace!(self.log, "Incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr);
None
}
SwarmEvent::IncomingConnectionError {
local_addr,
send_back_addr,
error,
connection_id: _,
} => {
let error_repr = match error {
libp2p::swarm::ListenError::Aborted => {
"Incoming connection aborted".to_string()
}
libp2p::swarm::ListenError::WrongPeerId { obtained, endpoint } => {
format!("Wrong peer id, obtained {obtained}, endpoint {endpoint:?}")
}
libp2p::swarm::ListenError::LocalPeerId { endpoint } => {
format!("Dialing local peer id {endpoint:?}")
}
libp2p::swarm::ListenError::Denied { cause } => {
format!("Connection was denied with cause: {cause:?}")
}
libp2p::swarm::ListenError::Transport(t) => match t {
libp2p::TransportError::MultiaddrNotSupported(m) => {
format!("Transport error: Multiaddr not supported: {m}")
}
Err(reason) => {
crit!(self.log, "Listener abruptly closed"; "addresses" => ?addresses, "reason" => ?reason)
libp2p::TransportError::Other(e) => {
format!("Transport error: other: {e}")
}
};
if Swarm::listeners(&self.swarm).count() == 0 {
Some(NetworkEvent::ZeroListeners)
} else {
None
},
};
debug!(self.log, "Failed incoming connection"; "our_addr" => %local_addr, "from" => %send_back_addr, "error" => error_repr);
None
}
SwarmEvent::OutgoingConnectionError {
peer_id: _,
error: _,
connection_id: _,
} => {
// The Behaviour event is more general than the swarm event here. It includes
// connection failures. So we use that log for now, in the peer manager
// behaviour implementation.
None
}
SwarmEvent::NewListenAddr { address, .. } => Some(NetworkEvent::NewListenAddr(address)),
SwarmEvent::ExpiredListenAddr { address, .. } => {
debug!(self.log, "Listen address expired"; "address" => %address);
None
}
SwarmEvent::ListenerClosed {
addresses, reason, ..
} => {
match reason {
Ok(_) => {
debug!(self.log, "Listener gracefully closed"; "addresses" => ?addresses)
}
}
SwarmEvent::ListenerError { error, .. } => {
debug!(self.log, "Listener closed connection attempt"; "reason" => ?error);
None
}
_ => {
// NOTE: SwarmEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback
Err(reason) => {
crit!(self.log, "Listener abruptly closed"; "addresses" => ?addresses, "reason" => ?reason)
}
};
if Swarm::listeners(&self.swarm).count() == 0 {
Some(NetworkEvent::ZeroListeners)
} else {
None
}
};

if let Some(ev) = maybe_event {
return Poll::Ready(ev);
}
}

// perform gossipsub score updates when necessary
while self.update_gossipsub_scores.poll_tick(cx).is_ready() {
let this = self.swarm.behaviour_mut();
this.peer_manager.update_gossipsub_scores(&this.gossipsub);
}

// poll the gossipsub cache to clear expired messages
while let Poll::Ready(Some(result)) = self.gossip_cache.poll_next_unpin(cx) {
match result {
Err(e) => warn!(self.log, "Gossip cache error"; "error" => e),
Ok(expired_topic) => {
if let Some(v) = metrics::get_int_counter(
&metrics::GOSSIP_EXPIRED_LATE_PUBLISH_PER_TOPIC_KIND,
&[expired_topic.kind().as_ref()],
) {
v.inc()
};
}
SwarmEvent::ListenerError { error, .. } => {
debug!(self.log, "Listener closed connection attempt"; "reason" => ?error);
None
}
_ => {
// NOTE: SwarmEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback
None
}
}
Poll::Pending
}

pub async fn next_event(&mut self) -> NetworkEvent<E> {
futures::future::poll_fn(|cx| self.poll_network(cx)).await
}
}
Loading