diff --git a/crates/papyrus_network/src/discovery/discovery_test.rs b/crates/papyrus_network/src/discovery/discovery_test.rs index 2526a36be3..fbdc94fd88 100644 --- a/crates/papyrus_network/src/discovery/discovery_test.rs +++ b/crates/papyrus_network/src/discovery/discovery_test.rs @@ -5,7 +5,6 @@ use std::task::{Context, Poll}; use std::time::Duration; use assert_matches::assert_matches; -use futures::future::pending; use futures::{FutureExt, Stream, StreamExt}; use libp2p::core::{ConnectedPoint, Endpoint}; use libp2p::swarm::behaviour::ConnectionEstablished; @@ -19,16 +18,10 @@ use libp2p::swarm::{ ToSwarm, }; use libp2p::{Multiaddr, PeerId}; -use tokio::select; -use tokio::sync::Mutex; use tokio::time::timeout; use void::Void; -use super::kad_impl::KadToOtherBehaviourEvent; use super::{Behaviour, DiscoveryConfig, RetryConfig, ToOtherBehaviourEvent}; -use crate::mixed_behaviour; -use crate::mixed_behaviour::BridgedBehaviour; -use crate::test_utils::next_on_mutex_stream; const TIMEOUT: Duration = Duration::from_secs(1); const BOOTSTRAP_DIAL_SLEEP_MILLIS: u64 = 1000; // 1 second @@ -56,6 +49,8 @@ impl Stream for Behaviour { } } +// TODO(shahak): Make the tests resilient to the order of events. + // In case we have a bug when we return pending and then return an event. const TIMES_TO_CHECK_FOR_PENDING_EVENT: usize = 5; @@ -66,7 +61,7 @@ fn assert_no_event(behaviour: &mut Behaviour) { } #[tokio::test] -async fn discovery_outputs_dial_request_on_start_without_query() { +async fn discovery_outputs_dial_request_and_query_on_start() { let bootstrap_peer_id = PeerId::random(); let bootstrap_peer_address = Multiaddr::empty(); @@ -78,6 +73,12 @@ async fn discovery_outputs_dial_request_on_start_without_query() { ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id) ); + let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap(); + assert_matches!( + event, + ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id)) + ); + assert_no_event(&mut behaviour); } @@ -103,7 +104,9 @@ async fn discovery_redials_on_dial_failure() { let bootstrap_peer_id = PeerId::random(); let bootstrap_peer_address = Multiaddr::empty(); - let mut behaviour = Behaviour::new(CONFIG, bootstrap_peer_id, bootstrap_peer_address); + let mut config = CONFIG.clone(); + config.heartbeat_interval = BOOTSTRAP_DIAL_SLEEP * 2; + let mut behaviour = Behaviour::new(config, bootstrap_peer_id, bootstrap_peer_address); let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap(); assert_matches!( @@ -111,6 +114,9 @@ async fn discovery_redials_on_dial_failure() { ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id) ); + // Consume the first query event. + behaviour.next().await.unwrap(); + behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure { peer_id: Some(bootstrap_peer_id), error: &DialError::Aborted, @@ -229,16 +235,17 @@ async fn discovery_outputs_single_query_after_connecting() { } #[tokio::test] -async fn discovery_outputs_single_query_on_query_finished() { - let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(CONFIG).await; +async fn discovery_sleeps_between_queries() { + let mut config = CONFIG; + const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); + config.heartbeat_interval = HEARTBEAT_INTERVAL; + + let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(config).await; // Consume the initial query event. timeout(TIMEOUT, behaviour.next()).await.unwrap(); - behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::Kad( - KadToOtherBehaviourEvent::KadQueryFinished, - )); - let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap(); + let event = check_event_happens_after_given_duration(&mut behaviour, HEARTBEAT_INTERVAL).await; assert_matches!( event, ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id)) @@ -246,49 +253,35 @@ async fn discovery_outputs_single_query_on_query_finished() { } #[tokio::test] -async fn discovery_sleeps_between_queries() { +async fn discovery_performs_queries_even_if_not_connected_to_bootstrap_peer() { let mut config = CONFIG; const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); + const BOOTSTRAP_DIAL_SLEEP: Duration = Duration::from_secs(5); config.heartbeat_interval = HEARTBEAT_INTERVAL; + config.bootstrap_dial_retry_config.base_delay_millis = + BOOTSTRAP_DIAL_SLEEP.as_millis().try_into().unwrap(); + config.bootstrap_dial_retry_config.max_delay_seconds = BOOTSTRAP_DIAL_SLEEP; - let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(config).await; + let bootstrap_peer_id = PeerId::random(); + let bootstrap_peer_address = Multiaddr::empty(); - // Consume the initial query event. + let mut behaviour = Behaviour::new(config, bootstrap_peer_id, bootstrap_peer_address.clone()); + + // Consume the initial dial and query events. + timeout(TIMEOUT, behaviour.next()).await.unwrap(); timeout(TIMEOUT, behaviour.next()).await.unwrap(); - // Report that the query has finished - behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::Kad( - KadToOtherBehaviourEvent::KadQueryFinished, - )); + // Simulate dial failure. + behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure { + peer_id: Some(bootstrap_peer_id), + error: &DialError::Aborted, + connection_id: ConnectionId::new_unchecked(0), + })); + // Check that we get a new Kad query after HEARTBEAT_INTERVAL. let event = check_event_happens_after_given_duration(&mut behaviour, HEARTBEAT_INTERVAL).await; assert_matches!( event, ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id)) ); } - -#[tokio::test] -async fn discovery_awakes_on_query_finished() { - let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(CONFIG).await; - - // Consume the initial query event. - timeout(TIMEOUT, behaviour.next()).await.unwrap(); - - let mutex = Mutex::new(behaviour); - - select! { - _ = async { - mutex.lock().await.on_other_behaviour_event( - &mixed_behaviour::ToOtherBehaviourEvent::Kad( - KadToOtherBehaviourEvent::KadQueryFinished, - ) - ); - timeout(TIMEOUT, pending::<()>()).await.unwrap(); - } => {}, - maybe_event = next_on_mutex_stream(&mutex) => assert_matches!( - maybe_event.unwrap(), - ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id)) - ), - } -} diff --git a/crates/papyrus_network/src/discovery/kad_impl.rs b/crates/papyrus_network/src/discovery/kad_impl.rs index 17b72d3063..7bed62af14 100644 --- a/crates/papyrus_network/src/discovery/kad_impl.rs +++ b/crates/papyrus_network/src/discovery/kad_impl.rs @@ -1,36 +1,16 @@ use libp2p::kad; -use tracing::{error, info}; +use tracing::info; use super::identify_impl::IdentifyToOtherBehaviourEvent; use crate::mixed_behaviour::BridgedBehaviour; use crate::{mixed_behaviour, peer_manager}; #[derive(Debug)] -pub enum KadToOtherBehaviourEvent { - KadQueryFinished, -} +pub enum KadToOtherBehaviourEvent {} impl From for mixed_behaviour::Event { - fn from(event: kad::Event) -> Self { - match event { - kad::Event::OutboundQueryProgressed { - id: _, - result: kad::QueryResult::GetClosestPeers(result), - .. - } => { - if let Err(err) = result { - error!("Kademlia query failed on {err:?}"); - } - mixed_behaviour::Event::ToOtherBehaviourEvent( - mixed_behaviour::ToOtherBehaviourEvent::Kad( - KadToOtherBehaviourEvent::KadQueryFinished, - ), - ) - } - _ => mixed_behaviour::Event::ToOtherBehaviourEvent( - mixed_behaviour::ToOtherBehaviourEvent::NoOp, - ), - } + fn from(_event: kad::Event) -> Self { + mixed_behaviour::Event::ToOtherBehaviourEvent(mixed_behaviour::ToOtherBehaviourEvent::NoOp) } } diff --git a/crates/papyrus_network/src/discovery/mod.rs b/crates/papyrus_network/src/discovery/mod.rs index fb37d2e57c..54c5b3896e 100644 --- a/crates/papyrus_network/src/discovery/mod.rs +++ b/crates/papyrus_network/src/discovery/mod.rs @@ -6,12 +6,11 @@ pub mod identify_impl; pub mod kad_impl; use std::collections::BTreeMap; -use std::task::{ready, Context, Poll, Waker}; +use std::task::{ready, Context, Poll}; use std::time::Duration; -use futures::future::BoxFuture; +use futures::future::{pending, select, BoxFuture, Either}; use futures::{pin_mut, Future, FutureExt}; -use kad_impl::KadToOtherBehaviourEvent; use libp2p::core::Endpoint; use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; @@ -42,8 +41,6 @@ use crate::mixed_behaviour::BridgedBehaviour; pub struct Behaviour { config: DiscoveryConfig, - // TODO(shahak): Consider running several queries in parallel - is_query_running: bool, bootstrap_peer_address: Multiaddr, bootstrap_peer_id: PeerId, is_dialing_to_bootstrap_peer: bool, @@ -51,7 +48,6 @@ pub struct Behaviour { sleep_future_for_dialing_bootstrap_peer: Option>, is_connected_to_bootstrap_peer: bool, is_bootstrap_in_kad_routing_table: bool, - wakers_waiting_for_query_to_finish: Vec, bootstrap_dial_retry_strategy: ExponentialBackoff, query_sleep_future: Option>, } @@ -94,8 +90,6 @@ impl NetworkBehaviour for Behaviour { self.is_dialing_to_bootstrap_peer = false; // For the case that the reason for failure is consistent (e.g the bootstrap peer // is down), we sleep before redialing - // TODO(shahak): Consider increasing the time after each failure, the same way we - // do in starknet client. self.sleep_future_for_dialing_bootstrap_peer = Some( tokio::time::sleep(self.bootstrap_dial_retry_strategy.next().expect( "Dial sleep strategy ended even though it's an infinite iterator.", @@ -118,6 +112,7 @@ impl NetworkBehaviour for Behaviour { }) if peer_id == self.bootstrap_peer_id && remaining_established == 0 => { self.is_connected_to_bootstrap_peer = false; self.is_dialing_to_bootstrap_peer = false; + self.is_bootstrap_in_kad_routing_table = false; } FromSwarm::AddressChange(AddressChange { peer_id, .. }) if peer_id == self.bootstrap_peer_id => @@ -141,29 +136,7 @@ impl NetworkBehaviour for Behaviour { cx: &mut Context<'_>, ) -> Poll::FromBehaviour>> { - if !self.is_dialing_to_bootstrap_peer && !self.is_connected_to_bootstrap_peer { - if let Some(sleep_future) = &mut self.sleep_future_for_dialing_bootstrap_peer { - pin_mut!(sleep_future); - ready!(sleep_future.poll(cx)); - } - self.is_dialing_to_bootstrap_peer = true; - self.sleep_future_for_dialing_bootstrap_peer = None; - return Poll::Ready(ToSwarm::Dial { - opts: DialOpts::peer_id(self.bootstrap_peer_id) - .addresses(vec![self.bootstrap_peer_address.clone()]) - // The peer manager might also be dialing to the bootstrap node. - .condition(PeerCondition::DisconnectedAndNotDialing) - .build(), - }); - } - - // If we're not connected to any node, then each Kademlia query we make will automatically - // return without any peers. Running queries in that mode will add unnecessary overload to - // the swarm. - if !self.is_connected_to_bootstrap_peer { - return Poll::Pending; - } - if !self.is_bootstrap_in_kad_routing_table { + if self.is_connected_to_bootstrap_peer && !self.is_bootstrap_in_kad_routing_table { self.is_bootstrap_in_kad_routing_table = true; return Poll::Ready(ToSwarm::GenerateEvent( ToOtherBehaviourEvent::FoundListenAddresses { @@ -173,19 +146,55 @@ impl NetworkBehaviour for Behaviour { )); } - if self.is_query_running { - self.wakers_waiting_for_query_to_finish.push(cx.waker().clone()); - return Poll::Pending; - } - if let Some(sleep_future) = &mut self.query_sleep_future { - pin_mut!(sleep_future); - ready!(sleep_future.poll(cx)); - } - self.is_query_running = true; - self.query_sleep_future = None; - Poll::Ready(ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery( - libp2p::identity::PeerId::random(), - ))) + // Unpacking self so that we can create 2 futures that use different members of self + let Self { + is_dialing_to_bootstrap_peer, + is_connected_to_bootstrap_peer, + sleep_future_for_dialing_bootstrap_peer, + bootstrap_peer_id, + bootstrap_peer_address, + query_sleep_future, + config, + .. + } = self; + + let bootstrap_dial_future = async move { + if !(*is_dialing_to_bootstrap_peer) && !(*is_connected_to_bootstrap_peer) { + if let Some(sleep_future) = sleep_future_for_dialing_bootstrap_peer { + sleep_future.await; + } + *is_dialing_to_bootstrap_peer = true; + *sleep_future_for_dialing_bootstrap_peer = None; + return ToSwarm::Dial { + opts: DialOpts::peer_id(*bootstrap_peer_id) + .addresses(vec![bootstrap_peer_address.clone()]) + // The peer manager might also be dialing to the bootstrap node. + .condition(PeerCondition::DisconnectedAndNotDialing) + .build(), + }; + } + // We're already connected to the bootstrap peer. Nothing to do + // TODO: register a waker here and wake it when we receive an event that we've + // disconnected from the bootstrap peer. + pending().await + }; + pin_mut!(bootstrap_dial_future); + let kad_future = async move { + if let Some(sleep_future) = query_sleep_future { + sleep_future.await; + } + *query_sleep_future = Some(tokio::time::sleep(config.heartbeat_interval).boxed()); + ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery( + libp2p::identity::PeerId::random(), + )) + }; + pin_mut!(kad_future); + + // polling both futures together since each of them contains sleep. + let select_future = select(bootstrap_dial_future, kad_future); + pin_mut!(select_future); + let (Either::Left((event, _)) | Either::Right((event, _))) = ready!(select_future.poll(cx)); + Poll::Ready(event) } } @@ -279,14 +288,12 @@ impl Behaviour { let bootstrap_dial_retry_strategy = config.bootstrap_dial_retry_config.strategy(); Self { config, - is_query_running: false, bootstrap_peer_id, bootstrap_peer_address, is_dialing_to_bootstrap_peer: false, sleep_future_for_dialing_bootstrap_peer: None, is_connected_to_bootstrap_peer: false, is_bootstrap_in_kad_routing_table: false, - wakers_waiting_for_query_to_finish: Vec::new(), bootstrap_dial_retry_strategy, query_sleep_future: None, } @@ -312,16 +319,5 @@ impl From for mixed_behaviour::Event { } impl BridgedBehaviour for Behaviour { - fn on_other_behaviour_event(&mut self, event: &mixed_behaviour::ToOtherBehaviourEvent) { - let mixed_behaviour::ToOtherBehaviourEvent::Kad(KadToOtherBehaviourEvent::KadQueryFinished) = - event - else { - return; - }; - for waker in self.wakers_waiting_for_query_to_finish.drain(..) { - waker.wake(); - } - self.query_sleep_future = Some(tokio::time::sleep(self.config.heartbeat_interval).boxed()); - self.is_query_running = false; - } + fn on_other_behaviour_event(&mut self, _event: &mixed_behaviour::ToOtherBehaviourEvent) {} } diff --git a/crates/papyrus_network/src/test_utils/mod.rs b/crates/papyrus_network/src/test_utils/mod.rs index 9afbb35347..ebdc3148fb 100644 --- a/crates/papyrus_network/src/test_utils/mod.rs +++ b/crates/papyrus_network/src/test_utils/mod.rs @@ -2,18 +2,13 @@ mod get_stream; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; -use std::pin::Pin; -use std::task::{ready, Context, Poll}; use std::time::Duration; -use futures::future::{Either, Future}; -use futures::pin_mut; -use futures::stream::Stream as StreamTrait; +use futures::future::Either; use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; use libp2p::swarm::{ConnectionId, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::{PeerId, Stream, StreamProtocol}; use libp2p_swarm_test::SwarmExt; -use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio_stream::StreamExt; @@ -148,29 +143,3 @@ where } } } - -// I tried making this generic on the async function we run, but it caused a lot of lifetime -// issues. -/// Run `next` on a mutex of a stream, unlocking it while the function is pending. -pub(crate) fn next_on_mutex_stream( - mutex: &Mutex, -) -> NextOnMutexStream<'_, T> { - NextOnMutexStream { mutex } -} - -pub(crate) struct NextOnMutexStream<'a, T: StreamTrait + Unpin> { - mutex: &'a Mutex, -} - -impl<'a, T: StreamTrait + Unpin> Future for NextOnMutexStream<'a, T> { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let lock_fut = self.mutex.lock(); - pin_mut!(lock_fut); - let mut locked_value = ready!(lock_fut.poll(cx)); - let fut = StreamExt::next(&mut *locked_value); - pin_mut!(fut); - fut.poll(cx) - } -}