Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(network): discovery performs kad queries even if not connected to bootstrap #2142

Merged
merged 1 commit into from
Dec 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 40 additions & 47 deletions crates/papyrus_network/src/discovery/discovery_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::task::{Context, Poll};
use std::time::Duration;

use assert_matches::assert_matches;
use futures::future::pending;
use futures::{FutureExt, Stream, StreamExt};
use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::swarm::behaviour::ConnectionEstablished;
Expand All @@ -19,16 +18,10 @@ use libp2p::swarm::{
ToSwarm,
};
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::Mutex;
use tokio::time::timeout;
use void::Void;

use super::kad_impl::KadToOtherBehaviourEvent;
use super::{Behaviour, DiscoveryConfig, RetryConfig, ToOtherBehaviourEvent};
use crate::mixed_behaviour;
use crate::mixed_behaviour::BridgedBehaviour;
use crate::test_utils::next_on_mutex_stream;

const TIMEOUT: Duration = Duration::from_secs(1);
const BOOTSTRAP_DIAL_SLEEP_MILLIS: u64 = 1000; // 1 second
Expand Down Expand Up @@ -56,6 +49,8 @@ impl Stream for Behaviour {
}
}

// TODO(shahak): Make the tests resilient to the order of events.

// In case we have a bug when we return pending and then return an event.
const TIMES_TO_CHECK_FOR_PENDING_EVENT: usize = 5;

Expand All @@ -66,7 +61,7 @@ fn assert_no_event(behaviour: &mut Behaviour) {
}

#[tokio::test]
async fn discovery_outputs_dial_request_on_start_without_query() {
async fn discovery_outputs_dial_request_and_query_on_start() {
let bootstrap_peer_id = PeerId::random();
let bootstrap_peer_address = Multiaddr::empty();

Expand All @@ -78,6 +73,12 @@ async fn discovery_outputs_dial_request_on_start_without_query() {
ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id)
);

let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
);

assert_no_event(&mut behaviour);
}

Expand All @@ -103,14 +104,19 @@ async fn discovery_redials_on_dial_failure() {
let bootstrap_peer_id = PeerId::random();
let bootstrap_peer_address = Multiaddr::empty();

let mut behaviour = Behaviour::new(CONFIG, bootstrap_peer_id, bootstrap_peer_address);
let mut config = CONFIG.clone();
config.heartbeat_interval = BOOTSTRAP_DIAL_SLEEP * 2;
let mut behaviour = Behaviour::new(config, bootstrap_peer_id, bootstrap_peer_address);

let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
assert_matches!(
event,
ToSwarm::Dial{opts} if opts.get_peer_id() == Some(bootstrap_peer_id)
);

// Consume the first query event.
behaviour.next().await.unwrap();

behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: Some(bootstrap_peer_id),
error: &DialError::Aborted,
Expand Down Expand Up @@ -229,66 +235,53 @@ async fn discovery_outputs_single_query_after_connecting() {
}

#[tokio::test]
async fn discovery_outputs_single_query_on_query_finished() {
let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(CONFIG).await;
async fn discovery_sleeps_between_queries() {
let mut config = CONFIG;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
config.heartbeat_interval = HEARTBEAT_INTERVAL;

let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(config).await;

// Consume the initial query event.
timeout(TIMEOUT, behaviour.next()).await.unwrap();

behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
));
let event = timeout(TIMEOUT, behaviour.next()).await.unwrap().unwrap();
let event = check_event_happens_after_given_duration(&mut behaviour, HEARTBEAT_INTERVAL).await;
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
);
}

#[tokio::test]
async fn discovery_sleeps_between_queries() {
async fn discovery_performs_queries_even_if_not_connected_to_bootstrap_peer() {
let mut config = CONFIG;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
const BOOTSTRAP_DIAL_SLEEP: Duration = Duration::from_secs(5);
config.heartbeat_interval = HEARTBEAT_INTERVAL;
config.bootstrap_dial_retry_config.base_delay_millis =
BOOTSTRAP_DIAL_SLEEP.as_millis().try_into().unwrap();
config.bootstrap_dial_retry_config.max_delay_seconds = BOOTSTRAP_DIAL_SLEEP;

let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(config).await;
let bootstrap_peer_id = PeerId::random();
let bootstrap_peer_address = Multiaddr::empty();

// Consume the initial query event.
let mut behaviour = Behaviour::new(config, bootstrap_peer_id, bootstrap_peer_address.clone());

// Consume the initial dial and query events.
timeout(TIMEOUT, behaviour.next()).await.unwrap();
timeout(TIMEOUT, behaviour.next()).await.unwrap();

// Report that the query has finished
behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
));
// Simulate dial failure.
behaviour.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: Some(bootstrap_peer_id),
error: &DialError::Aborted,
connection_id: ConnectionId::new_unchecked(0),
}));

// Check that we get a new Kad query after HEARTBEAT_INTERVAL.
let event = check_event_happens_after_given_duration(&mut behaviour, HEARTBEAT_INTERVAL).await;
assert_matches!(
event,
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
);
}

#[tokio::test]
async fn discovery_awakes_on_query_finished() {
let mut behaviour = create_behaviour_and_connect_to_bootstrap_node(CONFIG).await;

// Consume the initial query event.
timeout(TIMEOUT, behaviour.next()).await.unwrap();

let mutex = Mutex::new(behaviour);

select! {
_ = async {
mutex.lock().await.on_other_behaviour_event(
&mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
)
);
timeout(TIMEOUT, pending::<()>()).await.unwrap();
} => {},
maybe_event = next_on_mutex_stream(&mutex) => assert_matches!(
maybe_event.unwrap(),
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(_peer_id))
),
}
}
28 changes: 4 additions & 24 deletions crates/papyrus_network/src/discovery/kad_impl.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,16 @@
use libp2p::kad;
use tracing::{error, info};
use tracing::info;

use super::identify_impl::IdentifyToOtherBehaviourEvent;
use crate::mixed_behaviour::BridgedBehaviour;
use crate::{mixed_behaviour, peer_manager};

#[derive(Debug)]
pub enum KadToOtherBehaviourEvent {
KadQueryFinished,
}
pub enum KadToOtherBehaviourEvent {}

impl From<kad::Event> for mixed_behaviour::Event {
fn from(event: kad::Event) -> Self {
match event {
kad::Event::OutboundQueryProgressed {
id: _,
result: kad::QueryResult::GetClosestPeers(result),
..
} => {
if let Err(err) = result {
error!("Kademlia query failed on {err:?}");
}
mixed_behaviour::Event::ToOtherBehaviourEvent(
mixed_behaviour::ToOtherBehaviourEvent::Kad(
KadToOtherBehaviourEvent::KadQueryFinished,
),
)
}
_ => mixed_behaviour::Event::ToOtherBehaviourEvent(
mixed_behaviour::ToOtherBehaviourEvent::NoOp,
),
}
fn from(_event: kad::Event) -> Self {
mixed_behaviour::Event::ToOtherBehaviourEvent(mixed_behaviour::ToOtherBehaviourEvent::NoOp)
}
}

Expand Down
112 changes: 54 additions & 58 deletions crates/papyrus_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ pub mod identify_impl;
pub mod kad_impl;

use std::collections::BTreeMap;
use std::task::{ready, Context, Poll, Waker};
use std::task::{ready, Context, Poll};
use std::time::Duration;

use futures::future::BoxFuture;
use futures::future::{pending, select, BoxFuture, Either};
use futures::{pin_mut, Future, FutureExt};
use kad_impl::KadToOtherBehaviourEvent;
use libp2p::core::Endpoint;
use libp2p::swarm::behaviour::ConnectionEstablished;
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
Expand Down Expand Up @@ -42,16 +41,13 @@ use crate::mixed_behaviour::BridgedBehaviour;

pub struct Behaviour {
config: DiscoveryConfig,
// TODO(shahak): Consider running several queries in parallel
is_query_running: bool,
bootstrap_peer_address: Multiaddr,
bootstrap_peer_id: PeerId,
is_dialing_to_bootstrap_peer: bool,
// This needs to be boxed to allow polling it from a &mut.
sleep_future_for_dialing_bootstrap_peer: Option<BoxFuture<'static, ()>>,
is_connected_to_bootstrap_peer: bool,
is_bootstrap_in_kad_routing_table: bool,
wakers_waiting_for_query_to_finish: Vec<Waker>,
bootstrap_dial_retry_strategy: ExponentialBackoff,
query_sleep_future: Option<BoxFuture<'static, ()>>,
}
Expand Down Expand Up @@ -94,8 +90,6 @@ impl NetworkBehaviour for Behaviour {
self.is_dialing_to_bootstrap_peer = false;
// For the case that the reason for failure is consistent (e.g the bootstrap peer
// is down), we sleep before redialing
// TODO(shahak): Consider increasing the time after each failure, the same way we
// do in starknet client.
self.sleep_future_for_dialing_bootstrap_peer = Some(
tokio::time::sleep(self.bootstrap_dial_retry_strategy.next().expect(
"Dial sleep strategy ended even though it's an infinite iterator.",
Expand All @@ -118,6 +112,7 @@ impl NetworkBehaviour for Behaviour {
}) if peer_id == self.bootstrap_peer_id && remaining_established == 0 => {
self.is_connected_to_bootstrap_peer = false;
self.is_dialing_to_bootstrap_peer = false;
self.is_bootstrap_in_kad_routing_table = false;
}
FromSwarm::AddressChange(AddressChange { peer_id, .. })
if peer_id == self.bootstrap_peer_id =>
Expand All @@ -141,29 +136,7 @@ impl NetworkBehaviour for Behaviour {
cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, <Self::ConnectionHandler as ConnectionHandler>::FromBehaviour>>
{
if !self.is_dialing_to_bootstrap_peer && !self.is_connected_to_bootstrap_peer {
if let Some(sleep_future) = &mut self.sleep_future_for_dialing_bootstrap_peer {
pin_mut!(sleep_future);
ready!(sleep_future.poll(cx));
}
self.is_dialing_to_bootstrap_peer = true;
self.sleep_future_for_dialing_bootstrap_peer = None;
return Poll::Ready(ToSwarm::Dial {
opts: DialOpts::peer_id(self.bootstrap_peer_id)
.addresses(vec![self.bootstrap_peer_address.clone()])
// The peer manager might also be dialing to the bootstrap node.
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
});
}

// If we're not connected to any node, then each Kademlia query we make will automatically
// return without any peers. Running queries in that mode will add unnecessary overload to
// the swarm.
if !self.is_connected_to_bootstrap_peer {
return Poll::Pending;
}
if !self.is_bootstrap_in_kad_routing_table {
if self.is_connected_to_bootstrap_peer && !self.is_bootstrap_in_kad_routing_table {
self.is_bootstrap_in_kad_routing_table = true;
return Poll::Ready(ToSwarm::GenerateEvent(
ToOtherBehaviourEvent::FoundListenAddresses {
Expand All @@ -173,19 +146,55 @@ impl NetworkBehaviour for Behaviour {
));
}

if self.is_query_running {
self.wakers_waiting_for_query_to_finish.push(cx.waker().clone());
return Poll::Pending;
}
if let Some(sleep_future) = &mut self.query_sleep_future {
pin_mut!(sleep_future);
ready!(sleep_future.poll(cx));
}
self.is_query_running = true;
self.query_sleep_future = None;
Poll::Ready(ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(
libp2p::identity::PeerId::random(),
)))
// Unpacking self so that we can create 2 futures that use different members of self
let Self {
is_dialing_to_bootstrap_peer,
is_connected_to_bootstrap_peer,
sleep_future_for_dialing_bootstrap_peer,
bootstrap_peer_id,
bootstrap_peer_address,
query_sleep_future,
config,
..
} = self;

let bootstrap_dial_future = async move {
if !(*is_dialing_to_bootstrap_peer) && !(*is_connected_to_bootstrap_peer) {
if let Some(sleep_future) = sleep_future_for_dialing_bootstrap_peer {
sleep_future.await;
}
*is_dialing_to_bootstrap_peer = true;
*sleep_future_for_dialing_bootstrap_peer = None;
return ToSwarm::Dial {
opts: DialOpts::peer_id(*bootstrap_peer_id)
.addresses(vec![bootstrap_peer_address.clone()])
// The peer manager might also be dialing to the bootstrap node.
.condition(PeerCondition::DisconnectedAndNotDialing)
.build(),
};
}
// We're already connected to the bootstrap peer. Nothing to do
// TODO: register a waker here and wake it when we receive an event that we've
// disconnected from the bootstrap peer.
pending().await
};
pin_mut!(bootstrap_dial_future);
let kad_future = async move {
if let Some(sleep_future) = query_sleep_future {
sleep_future.await;
}
*query_sleep_future = Some(tokio::time::sleep(config.heartbeat_interval).boxed());
ToSwarm::GenerateEvent(ToOtherBehaviourEvent::RequestKadQuery(
libp2p::identity::PeerId::random(),
))
};
pin_mut!(kad_future);

// polling both futures together since each of them contains sleep.
let select_future = select(bootstrap_dial_future, kad_future);
pin_mut!(select_future);
let (Either::Left((event, _)) | Either::Right((event, _))) = ready!(select_future.poll(cx));
Poll::Ready(event)
}
}

Expand Down Expand Up @@ -279,14 +288,12 @@ impl Behaviour {
let bootstrap_dial_retry_strategy = config.bootstrap_dial_retry_config.strategy();
Self {
config,
is_query_running: false,
bootstrap_peer_id,
bootstrap_peer_address,
is_dialing_to_bootstrap_peer: false,
sleep_future_for_dialing_bootstrap_peer: None,
is_connected_to_bootstrap_peer: false,
is_bootstrap_in_kad_routing_table: false,
wakers_waiting_for_query_to_finish: Vec::new(),
bootstrap_dial_retry_strategy,
query_sleep_future: None,
}
Expand All @@ -312,16 +319,5 @@ impl From<ToOtherBehaviourEvent> for mixed_behaviour::Event {
}

impl BridgedBehaviour for Behaviour {
fn on_other_behaviour_event(&mut self, event: &mixed_behaviour::ToOtherBehaviourEvent) {
let mixed_behaviour::ToOtherBehaviourEvent::Kad(KadToOtherBehaviourEvent::KadQueryFinished) =
event
else {
return;
};
for waker in self.wakers_waiting_for_query_to_finish.drain(..) {
waker.wake();
}
self.query_sleep_future = Some(tokio::time::sleep(self.config.heartbeat_interval).boxed());
self.is_query_running = false;
}
fn on_other_behaviour_event(&mut self, _event: &mixed_behaviour::ToOtherBehaviourEvent) {}
}
Loading
Loading