Skip to content

Commit

Permalink
fix(network): discovery performs kad queries even if not connected to…
Browse files Browse the repository at this point in the history
… bootstrap (#2142)
  • Loading branch information
ShahakShama authored Dec 1, 2024
1 parent 6f268c8 commit 5623a90
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 161 deletions.
87 changes: 40 additions & 47 deletions crates/papyrus_network/src/discovery/discovery_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::task::{Context, Poll};
use std::time::Duration;

use assert_matches::assert_matches;
use futures::future::pending;
use futures::{FutureExt, Stream, StreamExt};
use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::behaviour::ConnectionEstablished;
Expand All @@ -19,16 +18,10 @@ use libp2p::swarm::{
ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::Mutex;
use tokio::time::timeout;
use void::Void;

use super::kad_impl::KadToOtherBehaviourEvent;
use super::{Behaviour, DiscoveryConfig, RetryConfig, ToOtherBehaviourEvent};
use crate::mixed_behaviour;
use crate::mixed_behaviour::BridgedBehaviour;
use crate::test_utils::next_on_mutex_stream;

const TIMEOUT: Duration = Duration::from_secs(1);
const BOOTSTRAP_DIAL_SLEEP_MILLIS: u64 = 1000; // 1 second
Expand Down Expand Up @@ -56,6 +49,8 @@ impl Stream for Behaviour {
}
}

// TODO(shahak): Make the tests resilient to the order of events.

// In case we have a bug when we return pending and then return an event.
const TIMES_TO_CHECK_FOR_PENDING_EVENT: usize = 5;

Expand All @@ -66,7 +61,7 @@ fn assert_no_event(behaviour: &mut Behaviour) {
}

#[tokio::test]
async fn discovery_outputs_dial_request_on_start_without_query() {
async fn discovery_outputs_dial_request_and_query_on_start() {
let bootstrap_peer_id = PeerId::random();
let bootstrap_peer_address = Multiaddr::empty();

Expand All @@ -78,6 +73,12 @@ async fn discovery_outputs_dial_request_on_start_without_query() {
ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id)
);

let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
);

assert_no_event(&mut behaviour);
}

Expand All @@ -103,14 +104,19 @@ async fn discovery_redials_on_dial_failure() {
let bootstrap_peer_id = PeerId::random();
let bootstrap_peer_address = Multiaddr::empty();

let mut behaviour = Behaviour::new(CONFIG, bootstrap_peer_id, bootstrap_peer_address);
let mut config = CONFIG.clone();
config.heartbeat_interval = BOOTSTRAP_DIAL_SLEEP * 2;
let mut behaviour = Behaviour::new(config, bootstrap_peer_id, bootstrap_peer_address);

let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
assert_matches!(
event,
ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id)
);

// Consume the first query event.
behaviour.next().await.unwrap();

behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: Some(bootstrap_peer_id),
error: &DialError::Aborted,
Expand Down Expand Up @@ -229,66 +235,53 @@ async fn discovery_outputs_single_query_after_connecting() {
}

#[tokio::test]
async fn discovery_outputs_single_query_on_query_finished() {
let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(CONFIG).await;
async fn discovery_sleeps_between_queries() {
let mut config = CONFIG;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
config.heartbeat_interval = HEARTBEAT_INTERVAL;

let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(config).await;

// Consume the initial query event.
timeout(TIMEOUT, behaviour.next()).await.unwrap();

behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
));
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
let event = check_event_happens_after_given_duration(&mut behaviour, HEARTBEAT_INTERVAL).await;
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
);
}

#[tokio::test]
async fn discovery_sleeps_between_queries() {
async fn discovery_performs_queries_even_if_not_connected_to_bootstrap_peer() {
let mut config = CONFIG;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
const BOOTSTRAP_DIAL_SLEEP: Duration = Duration::from_secs(5);
config.heartbeat_interval = HEARTBEAT_INTERVAL;
config.bootstrap_dial_retry_config.base_delay_millis =
BOOTSTRAP_DIAL_SLEEP.as_millis().try_into().unwrap();
config.bootstrap_dial_retry_config.max_delay_seconds = BOOTSTRAP_DIAL_SLEEP;

let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(config).await;
let bootstrap_peer_id = PeerId::random();
let bootstrap_peer_address = Multiaddr::empty();

// Consume the initial query event.
let mut behaviour = Behaviour::new(config, bootstrap_peer_id, bootstrap_peer_address.clone());

// Consume the initial dial and query events.
timeout(TIMEOUT, behaviour.next()).await.unwrap();
timeout(TIMEOUT, behaviour.next()).await.unwrap();

// Report that the query has finished
behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
));
// Simulate dial failure.
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: Some(bootstrap_peer_id),
error: &DialError::Aborted,
connection_id: ConnectionId::new_unchecked(0),
}));

// Check that we get a new Kad query after HEARTBEAT_INTERVAL.
let event = check_event_happens_after_given_duration(&mut behaviour, HEARTBEAT_INTERVAL).await;
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
);
}

#[tokio::test]
async fn discovery_awakes_on_query_finished() {
let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(CONFIG).await;

// Consume the initial query event.
timeout(TIMEOUT, behaviour.next()).await.unwrap();

let mutex = Mutex::new(behaviour);

select! {
_ = async {
mutex.lock().await.on_other_behaviour_event(
&mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
)
);
timeout(TIMEOUT, pending::<()>()).await.unwrap();
} => {},
maybe_event = next_on_mutex_stream(&mutex) => assert_matches!(
maybe_event.unwrap(),
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
),
}
}
28 changes: 4 additions & 24 deletions crates/papyrus_network/src/discovery/kad_impl.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@
use libp2p::kad;
use tracing::{error, info};
use tracing::info;

use super::identify_impl::IdentifyToOtherBehaviourEvent;
use crate::mixed_behaviour::BridgedBehaviour;
use crate::{mixed_behaviour, peer_manager};

#[derive(Debug)]
pub enum KadToOtherBehaviourEvent {
KadQueryFinished,
}
pub enum KadToOtherBehaviourEvent {}

impl From<kad::Event> for mixed_behaviour::Event {
fn from(event: kad::Event) -> Self {
match event {
kad::Event::OutboundQueryProgressed {
id: _,
result: kad::QueryResult::GetClosestPeers(result),
..
} => {
if let Err(err) = result {
error!("Kademlia query failed on {err:?}");
}
mixed_behaviour::Event::ToOtherBehaviourEvent(
mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
),
)
}
_ => mixed_behaviour::Event::ToOtherBehaviourEvent(
mixed_behaviour::ToOtherBehaviourEvent::NoOp,
),
}
fn from(_event: kad::Event) -> Self {
mixed_behaviour::Event::ToOtherBehaviourEvent(mixed_behaviour::ToOtherBehaviourEvent::NoOp)
}
}

Expand Down
112 changes: 54 additions & 58 deletions crates/papyrus_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ pub mod identify_impl;
pub mod kad_impl;

use std::collections::BTreeMap;
use std::task::{ready, Context, Poll, Waker};
use std::task::{ready, Context, Poll};
use std::time::Duration;

use futures::future::BoxFuture;
use futures::future::{pending, select, BoxFuture, Either};
use futures::{pin_mut, Future, FutureExt};
use kad_impl::KadToOtherBehaviourEvent;
use libp2p::core::Endpoint;
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
Expand Down Expand Up @@ -42,16 +41,13 @@ use crate::mixed_behaviour::BridgedBehaviour;

pub struct Behaviour {
config: DiscoveryConfig,
// TODO(shahak): Consider running several queries in parallel
is_query_running: bool,
bootstrap_peer_address: Multiaddr,
bootstrap_peer_id: PeerId,
is_dialing_to_bootstrap_peer: bool,
// This needs to be boxed to allow polling it from a &mut.
sleep_future_for_dialing_bootstrap_peer: Option<BoxFuture<'static, ()>>,
is_connected_to_bootstrap_peer: bool,
is_bootstrap_in_kad_routing_table: bool,
wakers_waiting_for_query_to_finish: Vec<Waker>,
bootstrap_dial_retry_strategy: ExponentialBackoff,
query_sleep_future: Option<BoxFuture<'static, ()>>,
}
Expand Down Expand Up @@ -94,8 +90,6 @@ impl NetworkBehaviour for Behaviour {
self.is_dialing_to_bootstrap_peer = false;
// For the case that the reason for failure is consistent (e.g the bootstrap peer
// is down), we sleep before redialing
// TODO(shahak): Consider increasing the time after each failure, the same way we
// do in starknet client.
self.sleep_future_for_dialing_bootstrap_peer = Some(
tokio::time::sleep(self.bootstrap_dial_retry_strategy.next().expect(
"Dial sleep strategy ended even though it's an infinite iterator.",
Expand All @@ -118,6 +112,7 @@ impl NetworkBehaviour for Behaviour {
}) if peer_id == self.bootstrap_peer_id && remaining_established == 0 => {
self.is_connected_to_bootstrap_peer = false;
self.is_dialing_to_bootstrap_peer = false;
self.is_bootstrap_in_kad_routing_table = false;
}
FromSwarm::AddressChange(AddressChange { peer_id, .. })
if peer_id == self.bootstrap_peer_id =>
Expand All @@ -141,29 +136,7 @@ impl NetworkBehaviour for Behaviour {
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
{
if !self.is_dialing_to_bootstrap_peer && !self.is_connected_to_bootstrap_peer {
if let Some(sleep_future) = &mut self.sleep_future_for_dialing_bootstrap_peer {
pin_mut!(sleep_future);
ready!(sleep_future.poll(cx));
}
self.is_dialing_to_bootstrap_peer = true;
self.sleep_future_for_dialing_bootstrap_peer = None;
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(self.bootstrap_peer_id)
.addresses(vec![self.bootstrap_peer_address.clone()])
// The peer manager might also be dialing to the bootstrap node.
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
});
}

// If we're not connected to any node, then each Kademlia query we make will automatically
// return without any peers. Running queries in that mode will add unnecessary overload to
// the swarm.
if !self.is_connected_to_bootstrap_peer {
return Poll::Pending;
}
if !self.is_bootstrap_in_kad_routing_table {
if self.is_connected_to_bootstrap_peer && !self.is_bootstrap_in_kad_routing_table {
self.is_bootstrap_in_kad_routing_table = true;
return Poll::Ready(ToSwarm::GenerateEvent(
ToOtherBehaviourEvent::FoundListenAddresses {
Expand All @@ -173,19 +146,55 @@ impl NetworkBehaviour for Behaviour {
));
}

if self.is_query_running {
self.wakers_waiting_for_query_to_finish.push(cx.waker().clone());
return Poll::Pending;
}
if let Some(sleep_future) = &mut self.query_sleep_future {
pin_mut!(sleep_future);
ready!(sleep_future.poll(cx));
}
self.is_query_running = true;
self.query_sleep_future = None;
Poll::Ready(ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(
libp2p::identity::PeerId::random(),
)))
// Unpacking self so that we can create 2 futures that use different members of self
let Self {
is_dialing_to_bootstrap_peer,
is_connected_to_bootstrap_peer,
sleep_future_for_dialing_bootstrap_peer,
bootstrap_peer_id,
bootstrap_peer_address,
query_sleep_future,
config,
..
} = self;

let bootstrap_dial_future = async move {
if !(*is_dialing_to_bootstrap_peer) && !(*is_connected_to_bootstrap_peer) {
if let Some(sleep_future) = sleep_future_for_dialing_bootstrap_peer {
sleep_future.await;
}
*is_dialing_to_bootstrap_peer = true;
*sleep_future_for_dialing_bootstrap_peer = None;
return ToSwarm::Dial {
opts: DialOpts::peer_id(*bootstrap_peer_id)
.addresses(vec![bootstrap_peer_address.clone()])
// The peer manager might also be dialing to the bootstrap node.
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
};
}
// We're already connected to the bootstrap peer. Nothing to do
// TODO: register a waker here and wake it when we receive an event that we've
// disconnected from the bootstrap peer.
pending().await
};
pin_mut!(bootstrap_dial_future);
let kad_future = async move {
if let Some(sleep_future) = query_sleep_future {
sleep_future.await;
}
*query_sleep_future = Some(tokio::time::sleep(config.heartbeat_interval).boxed());
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(
libp2p::identity::PeerId::random(),
))
};
pin_mut!(kad_future);

// polling both futures together since each of them contains sleep.
let select_future = select(bootstrap_dial_future, kad_future);
pin_mut!(select_future);
let (Either::Left((event, _)) | Either::Right((event, _))) = ready!(select_future.poll(cx));
Poll::Ready(event)
}
}

Expand Down Expand Up @@ -279,14 +288,12 @@ impl Behaviour {
let bootstrap_dial_retry_strategy = config.bootstrap_dial_retry_config.strategy();
Self {
config,
is_query_running: false,
bootstrap_peer_id,
bootstrap_peer_address,
is_dialing_to_bootstrap_peer: false,
sleep_future_for_dialing_bootstrap_peer: None,
is_connected_to_bootstrap_peer: false,
is_bootstrap_in_kad_routing_table: false,
wakers_waiting_for_query_to_finish: Vec::new(),
bootstrap_dial_retry_strategy,
query_sleep_future: None,
}
Expand All @@ -312,16 +319,5 @@ impl From<ToOtherBehaviourEvent> for mixed_behaviour::Event {
}

impl BridgedBehaviour for Behaviour {
fn on_other_behaviour_event(&mut self, event: &mixed_behaviour::ToOtherBehaviourEvent) {
let mixed_behaviour::ToOtherBehaviourEvent::Kad(KadToOtherBehaviourEvent::KadQueryFinished) =
event
else {
return;
};
for waker in self.wakers_waiting_for_query_to_finish.drain(..) {
waker.wake();
}
self.query_sleep_future = Some(tokio::time::sleep(self.config.heartbeat_interval).boxed());
self.is_query_running = false;
}
fn on_other_behaviour_event(&mut self, _event: &mixed_behaviour::ToOtherBehaviourEvent) {}
}
Loading

0 comments on commit 5623a90

Please sign in to comment.