From c60f856e1055a91bb56dae023308cffc3843c7ea Mon Sep 17 00:00:00 2001 From: Shahak Shama Date: Thu, 25 Jul 2024 07:54:22 +0300 Subject: [PATCH] revert(network): remove send_query from sqmr::Behaviour --- crates/papyrus_network/Cargo.toml | 7 +- .../src/bin/streamed_bytes_benchmark.rs | 331 ------------------ crates/papyrus_network/src/sqmr/behaviour.rs | 83 +---- .../src/sqmr/behaviour_test.rs | 112 +++--- crates/papyrus_network/src/sqmr/flow_test.rs | 106 ++++-- crates/papyrus_network/src/test_utils/mod.rs | 77 +++- 6 files changed, 223 insertions(+), 493 deletions(-) delete mode 100644 crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs diff --git a/crates/papyrus_network/Cargo.toml b/crates/papyrus_network/Cargo.toml index aa3b18d7b5..d417f71765 100644 --- a/crates/papyrus_network/Cargo.toml +++ b/crates/papyrus_network/Cargo.toml @@ -8,15 +8,9 @@ license-file.workspace = true [features] testing = [] -[[bin]] -name = "streamed_bytes_benchmark" -required-features = ["clap"] -path = "src/bin/streamed_bytes_benchmark.rs" - [dependencies] async-stream.workspace = true bytes.workspace = true -defaultmap.workspace = true derive_more.workspace = true futures.workspace = true lazy_static.workspace = true @@ -50,6 +44,7 @@ clap = { workspace = true, optional = true, features = ["derive"] } [dev-dependencies] assert_matches.workspace = true deadqueue = { workspace = true, features = ["unlimited"] } +defaultmap.workspace = true libp2p-swarm-test.workspace = true mockall.workspace = true pretty_assertions.workspace = true diff --git a/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs b/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs deleted file mode 100644 index 21f3fa8dab..0000000000 --- a/crates/papyrus_network/src/bin/streamed_bytes_benchmark.rs +++ /dev/null @@ -1,331 +0,0 @@ -use std::collections::HashMap; -use std::time::{Duration, Instant}; - -use clap::Parser; -use futures::StreamExt; -use libp2p::swarm::SwarmEvent; -use libp2p::{PeerId, StreamProtocol, Swarm}; -use papyrus_network::bin_utils::{build_swarm, dial}; -use papyrus_network::sqmr::behaviour::{Behaviour, Event, ExternalEvent, SessionError}; -use papyrus_network::sqmr::{Bytes, Config, InboundSessionId, OutboundSessionId, SessionId}; - -const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/papyrus/bench/1"); -const CONST_BYTE: u8 = 1; - -fn pretty_size(mut size: f64) -> String { - for term in ["B", "KB", "MB", "GB"] { - if size < 1024.0 { - return format!("{:.2} {}", size, term); - } - size /= 1024.0; - } - format!("{:.2} TB", size) -} - -fn encode_inbound_session_metadata(num_messages: usize, message_size: usize) -> Bytes { - let mut result = num_messages.to_be_bytes().to_vec(); - result.extend_from_slice(&message_size.to_be_bytes()); - result -} - -fn decode_inbound_session_metadata(mut bytes: Bytes) -> (usize, usize) { - let second_bytes = bytes.split_off(8); - ( - usize::from_be_bytes( - bytes.try_into().expect("Failed converting a vec of size 8 to [u8; 8]"), - ), - usize::from_be_bytes( - second_bytes - .try_into() - .expect("Called decode_inbound_session_metadata on Vec of size not 16"), - ), - ) -} - -/// A node that benchmarks the throughput of messages sent/received. -#[derive(Parser)] -#[command(author, version, about, long_about = None)] -struct Args { - /// Address this node listens on for incoming connections. - #[arg(short, long)] - listen_address: String, - - /// Address this node attempts to dial to. - #[arg(short, long)] - dial_address: Option, - - /// Amount of expected inbound sessions. - #[arg(short = 'i', long)] - num_expected_inbound_sessions: usize, - - /// Amount of expected peers to connect to this peer (dial or listener). - #[arg(short = 'c', long)] - num_expected_connections: usize, - - /// Number of queries to send for each node that we connect to (whether we dialed to it or it - /// dialed to us). - #[arg(short = 'q', long, default_value_t)] - num_queries_per_connection: u64, - - /// Number of messages to send for each inbound session. - #[arg(short = 'm', long, default_value_t)] - num_messages_per_session: usize, - - /// Size (in bytes) of each message to send for inbound sessions. - #[arg(short = 's', long, default_value_t)] - message_size: usize, - - /// Amount of time (in seconds) to wait until closing an unactive connection. - #[arg(short = 't', long, default_value_t = 10)] - idle_connection_timeout: u64, -} - -fn create_outbound_sessions_if_all_peers_connected( - swarm: &mut Swarm, - peer_id: PeerId, - outbound_session_measurements: &mut HashMap, - peers_pending_outbound_session: &mut Vec, - args: &Args, -) { - peers_pending_outbound_session.push(peer_id); - if peers_pending_outbound_session.len() >= args.num_expected_connections { - for peer_id in peers_pending_outbound_session { - for _ in 0..args.num_queries_per_connection { - let outbound_session_id = - swarm.behaviour_mut().send_query(vec![], *peer_id, PROTOCOL_NAME).expect( - "There's no connection to a peer immediately after we got a \ - ConnectionEstablished event", - ); - outbound_session_measurements - .insert(outbound_session_id, OutboundSessionMeasurement::new()); - } - } - } -} - -fn send_response_to_inbound_sessions( - swarm: &mut Swarm, - inbound_session_to_messages: &mut HashMap>>, - args: &Args, -) { - for inbound_session_id in inbound_session_to_messages.keys() { - swarm - .behaviour_mut() - .send_response( - encode_inbound_session_metadata(args.num_messages_per_session, args.message_size), - *inbound_session_id, - ) - .unwrap_or_else(|_| { - panic!("Inbound session {} dissappeared unexpectedly", inbound_session_id) - }); - } - while !inbound_session_to_messages.is_empty() { - inbound_session_to_messages.retain(|inbound_session_id, messages| match messages.pop() { - Some(message) => { - swarm.behaviour_mut().send_response(message, *inbound_session_id).unwrap_or_else( - |_| panic!("Inbound session {} dissappeared unexpectedly", inbound_session_id), - ); - - true - } - None => { - swarm.behaviour_mut().close_inbound_session(*inbound_session_id).unwrap_or_else( - |_| panic!("Inbound session {} dissappeared unexpectedly", inbound_session_id), - ); - false - } - }) - } -} - -// TODO(shahak) extract to other file. -struct OutboundSessionMeasurement { - start_time: Instant, - first_message_time: Option, - num_messages: Option, - message_size: Option, -} - -impl OutboundSessionMeasurement { - pub fn print(&self) { - let Some(first_message_time) = self.first_message_time else { - println!( - "An outbound session finished with no messages, skipping time measurements -display" - ); - return; - }; - let messages_elapsed = first_message_time.elapsed(); - let elapsed = self.start_time.elapsed(); - let num_messages = self.num_messages.expect( - "OutboundSessionMeasurement's first_message_time field was set while the num_messages \ - field wasn't set", - ); - let message_size = self.message_size.expect( - "OutboundSessionMeasurement's first_message_time field was set while the message_size \ - field wasn't set", - ); - println!("########## Outbound session finished ##########"); - println!( - "Session had {} messages of size {}. In total {}", - num_messages, - pretty_size(message_size as f64), - pretty_size((message_size * num_messages) as f64), - ); - println!("Session took {:.3} seconds", elapsed.as_secs_f64()); - println!("Message sending took {:.3} seconds", messages_elapsed.as_secs_f64()); - println!("---- Total session statistics ----"); - println!("{:.2} messages/second", num_messages as f64 / elapsed.as_secs_f64()); - println!( - "{}/second", - pretty_size((message_size * num_messages) as f64 / elapsed.as_secs_f64()) - ); - println!("---- Message sending statistics ----"); - println!("{:.2} messages/second", num_messages as f64 / messages_elapsed.as_secs_f64()); - println!( - "{}/second", - pretty_size((message_size * num_messages) as f64 / messages_elapsed.as_secs_f64()) - ); - } - - pub fn new() -> Self { - Self { - start_time: Instant::now(), - first_message_time: None, - num_messages: None, - message_size: None, - } - } - pub fn report_first_message(&mut self, response: Bytes) { - self.first_message_time = Some(Instant::now()); - let (num_messages, message_size) = decode_inbound_session_metadata(response); - self.num_messages = Some(num_messages); - self.message_size = Some(message_size); - } -} - -fn dial_if_requested(swarm: &mut Swarm, args: &Args) { - if let Some(dial_address) = args.dial_address.as_ref() { - dial(swarm, dial_address); - } -} - -#[tokio::main] -async fn main() { - let args = Args::parse(); - - // TODO: add secret key to the args and replace None with it. - let mut swarm = build_swarm( - vec![args.listen_address.clone()], - Duration::from_secs(args.idle_connection_timeout), - None, - |_| { - let mut behaviour = - Behaviour::new(Config { session_timeout: Duration::from_secs(3600) }); - behaviour.add_new_supported_inbound_protocol(PROTOCOL_NAME); - behaviour - }, - ); - let mut outbound_session_measurements = HashMap::new(); - let mut inbound_session_to_messages = HashMap::new(); - let mut connected_in_the_past = false; - - let mut preprepared_messages = (0..args.num_expected_inbound_sessions) - .map(|_| { - (0..args.num_messages_per_session) - .map(|_| vec![CONST_BYTE; args.message_size]) - .collect::>() - }) - .collect::>(); - - let mut peers_pending_outbound_session = Vec::new(); - println!("Preprepared messages for sending"); - - dial_if_requested(&mut swarm, &args); - - while let Some(event) = swarm.next().await { - match event { - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - println!("Connected to a peer!"); - connected_in_the_past = true; - create_outbound_sessions_if_all_peers_connected( - &mut swarm, - peer_id, - &mut outbound_session_measurements, - &mut peers_pending_outbound_session, - &args, - ); - } - SwarmEvent::Behaviour(Event::External(ExternalEvent::NewInboundSession { - inbound_session_id, - .. - })) => { - inbound_session_to_messages.insert( - inbound_session_id, - preprepared_messages - .pop() - .expect("There are more inbound sessions than expected"), - ); - if preprepared_messages.is_empty() { - send_response_to_inbound_sessions( - &mut swarm, - &mut inbound_session_to_messages, - &args, - ); - } - } - SwarmEvent::Behaviour(Event::External( - ExternalEvent::SessionFinishedSuccessfully { - session_id: SessionId::OutboundSessionId(outbound_session_id), - }, - )) => { - outbound_session_measurements[&outbound_session_id].print(); - } - SwarmEvent::Behaviour(Event::External(ExternalEvent::ReceivedResponse { - outbound_session_id, - response, - peer_id: _, - })) => { - if response[0] != CONST_BYTE { - outbound_session_measurements - .get_mut(&outbound_session_id) - .expect("Received response on non-existing outbound session") - .report_first_message(response); - } - } - SwarmEvent::OutgoingConnectionError { .. } => { - dial_if_requested(&mut swarm, &args); - } - SwarmEvent::Behaviour(Event::External(ExternalEvent::SessionFailed { - session_id, - error: SessionError::ConnectionClosed, - })) => { - println!( - "Session {:?} failed on ConnectionClosed. Try to increase \ - idle_connection_timeout", - session_id - ); - } - SwarmEvent::Behaviour(Event::External(ExternalEvent::SessionFailed { - session_id, - error: SessionError::IOError(io_error), - })) => { - println!("Session {:?} failed on {}", session_id, io_error.kind()); - } - SwarmEvent::Behaviour(Event::External( - ExternalEvent::SessionFinishedSuccessfully { - session_id: SessionId::InboundSessionId(_), - }, - )) - | SwarmEvent::NewListenAddr { .. } - | SwarmEvent::IncomingConnection { .. } - | SwarmEvent::ConnectionClosed { .. } => {} - _ => { - panic!("Unexpected event {:?}", event); - } - } - if connected_in_the_past && swarm.network_info().num_peers() == 0 { - break; - } - } -} diff --git a/crates/papyrus_network/src/sqmr/behaviour.rs b/crates/papyrus_network/src/sqmr/behaviour.rs index 0c646dfaad..88787ceeb0 100644 --- a/crates/papyrus_network/src/sqmr/behaviour.rs +++ b/crates/papyrus_network/src/sqmr/behaviour.rs @@ -9,9 +9,7 @@ use std::sync::Arc; use std::task::{Context, Poll, Waker}; use std::time::Duration; -use defaultmap::DefaultHashMap; use libp2p::core::Endpoint; -use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::{ ConnectionClosed, ConnectionDenied, @@ -110,8 +108,6 @@ pub struct PeerNotConnected; pub struct Behaviour { config: Config, pending_events: VecDeque>, - // TODO(shahak) Remove this once we remove send_query. - connection_ids_map: DefaultHashMap>, session_id_to_peer_id_and_connection_id: HashMap, next_outbound_session_id: OutboundSessionId, next_inbound_session_id: Arc, @@ -126,7 +122,6 @@ impl Behaviour { Self { config, pending_events: Default::default(), - connection_ids_map: Default::default(), session_id_to_peer_id_and_connection_id: Default::default(), next_outbound_session_id: Default::default(), next_inbound_session_id: Arc::new(Default::default()), @@ -137,37 +132,6 @@ impl Behaviour { } } - /// Send query to the given peer and start a new outbound session with it. Return the id of the - /// new session. - // TODO(shahak) Remove this function once Network manager uses start_query. - pub fn send_query( - &mut self, - query: Bytes, - peer_id: PeerId, - protocol_name: StreamProtocol, - ) -> Result { - let connection_id = - *self.connection_ids_map.get(peer_id).iter().next().ok_or(PeerNotConnected)?; - - let outbound_session_id = self.next_outbound_session_id; - self.next_outbound_session_id.value += 1; - - self.session_id_to_peer_id_and_connection_id - .insert(outbound_session_id.into(), (peer_id, connection_id)); - - self.add_event_to_queue(ToSwarm::NotifyHandler { - peer_id, - handler: NotifyHandler::One(connection_id), - event: RequestFromBehaviourEvent::CreateOutboundSession { - query, - outbound_session_id, - protocol_name, - }, - }); - - Ok(outbound_session_id) - } - /// Assign some peer and start a query. Return the id of the new session. pub fn start_query( &mut self, @@ -292,36 +256,25 @@ impl NetworkBehaviour for Behaviour { } fn on_swarm_event(&mut self, event: FromSwarm<'_>) { - match event { - FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - .. - }) => { - self.connection_ids_map.get_mut(peer_id).insert(connection_id); - } - FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => { - let mut session_ids = Vec::new(); - self.session_id_to_peer_id_and_connection_id.retain( - |session_id, (session_peer_id, session_connection_id)| { - if peer_id == *session_peer_id && connection_id == *session_connection_id { - session_ids.push(*session_id); - false - } else { - true - } - }, - ); - for session_id in session_ids { - self.add_event_to_queue(ToSwarm::GenerateEvent(Event::External( - ExternalEvent::SessionFailed { - session_id, - error: SessionError::ConnectionClosed, - }, - ))); + let FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) = event + else { + return; + }; + let mut session_ids = Vec::new(); + self.session_id_to_peer_id_and_connection_id.retain( + |session_id, (session_peer_id, session_connection_id)| { + if peer_id == *session_peer_id && connection_id == *session_connection_id { + session_ids.push(*session_id); + false + } else { + true } - } - _ => {} + }, + ); + for session_id in session_ids { + self.add_event_to_queue(ToSwarm::GenerateEvent(Event::External( + ExternalEvent::SessionFailed { session_id, error: SessionError::ConnectionClosed }, + ))); } } diff --git a/crates/papyrus_network/src/sqmr/behaviour_test.rs b/crates/papyrus_network/src/sqmr/behaviour_test.rs index 9bc73cfa49..b9f5f5e9d0 100644 --- a/crates/papyrus_network/src/sqmr/behaviour_test.rs +++ b/crates/papyrus_network/src/sqmr/behaviour_test.rs @@ -1,5 +1,3 @@ -// TODO(shahak): Use start_query in all tests instead of send_query - use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,14 +5,15 @@ use assert_matches::assert_matches; use futures::{FutureExt, Stream, StreamExt}; use lazy_static::lazy_static; use libp2p::core::{ConnectedPoint, Endpoint}; -use libp2p::swarm::behaviour::ConnectionEstablished; use libp2p::swarm::{ConnectionClosed, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm}; use libp2p::{Multiaddr, PeerId, StreamProtocol}; use super::super::handler::{RequestFromBehaviourEvent, RequestToBehaviourEvent}; use super::super::{Bytes, Config, GenericEvent, InboundSessionId, OutboundSessionId, SessionId}; -use super::{Behaviour, Event, ExternalEvent, SessionError}; +use super::{Behaviour, Event, ExternalEvent, SessionError, ToOtherBehaviourEvent}; +use crate::mixed_behaviour::BridgedBehaviour; use crate::test_utils::dummy_data; +use crate::{mixed_behaviour, peer_manager}; impl Unpin for Behaviour {} @@ -34,37 +33,19 @@ lazy_static! { static ref PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/"); } -fn simulate_connection_established(behaviour: &mut Behaviour, peer_id: PeerId) { - let connection_id = ConnectionId::new_unchecked(0); - let address = Multiaddr::empty(); - let role_override = Endpoint::Dialer; - let _handler = behaviour - .handle_established_outbound_connection(connection_id, peer_id, &address, role_override) - .unwrap(); - behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint: &ConnectedPoint::Dialer { address, role_override }, - failed_addresses: &[], - other_established: 0, - })); -} - -fn simulate_listener_connection(behaviour: &mut Behaviour, peer_id: PeerId) { - let connection_id = ConnectionId::new_unchecked(0); - let address = Multiaddr::empty(); - let local_addr = Multiaddr::empty(); - let role_override = Endpoint::Listener; - let _handler = behaviour - .handle_established_outbound_connection(connection_id, peer_id, &address, role_override) - .unwrap(); - behaviour.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished { - peer_id, - connection_id, - endpoint: &ConnectedPoint::Listener { send_back_addr: address, local_addr }, - failed_addresses: &[], - other_established: 0, - })); +fn simulate_peer_assigned( + behaviour: &mut Behaviour, + peer_id: PeerId, + outbound_session_id: OutboundSessionId, +) { + behaviour.on_other_behaviour_event(&mixed_behaviour::ToOtherBehaviourEvent::PeerManager( + peer_manager::ToOtherBehaviourEvent::SessionAssigned { + outbound_session_id, + peer_id, + // TODO(shahak): Add test with multiple connections + connection_id: ConnectionId::new_unchecked(0), + }, + )); } fn simulate_new_inbound_session( @@ -75,6 +56,7 @@ fn simulate_new_inbound_session( ) { behaviour.on_connection_handler_event( peer_id, + // This is the same connection_id from simulate_peer_assigned ConnectionId::new_unchecked(0), RequestToBehaviourEvent::GenerateEvent(GenericEvent::NewInboundSession { query, @@ -93,6 +75,7 @@ fn simulate_received_response( ) { behaviour.on_connection_handler_event( peer_id, + // This is the same connection_id from simulate_peer_assigned ConnectionId::new_unchecked(0), RequestToBehaviourEvent::GenerateEvent(GenericEvent::ReceivedResponse { response, @@ -109,6 +92,7 @@ fn simulate_session_finished_successfully( ) { behaviour.on_connection_handler_event( peer_id, + // This is the same connection_id from simulate_peer_assigned ConnectionId::new_unchecked(0), RequestToBehaviourEvent::GenerateEvent(GenericEvent::SessionFinishedSuccessfully { session_id, @@ -117,7 +101,7 @@ fn simulate_session_finished_successfully( } fn simulate_connection_closed(behaviour: &mut Behaviour, peer_id: PeerId) { - // This is the same connection_id from simulate_connection_established + // This is the same connection_id from simulate_peer_assigned let connection_id = ConnectionId::new_unchecked(0); behaviour.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, @@ -140,6 +124,20 @@ fn simulate_session_dropped(behaviour: &mut Behaviour, peer_id: PeerId, session_ ); } +async fn validate_request_peer_assignment_event( + behaviour: &mut Behaviour, + outbound_session_id: OutboundSessionId, +) { + let event = behaviour.next().await.unwrap(); + assert_matches!( + event, + ToSwarm::GenerateEvent(Event::ToOtherBehaviourEvent(ToOtherBehaviourEvent::RequestPeerAssignment { + outbound_session_id: event_outbound_session_id + }, + )) if outbound_session_id == event_outbound_session_id + ); +} + async fn validate_create_outbound_session_event( behaviour: &mut Behaviour, peer_id: &PeerId, @@ -280,8 +278,6 @@ async fn process_inbound_session() { let peer_id = PeerId::random(); let inbound_session_id = InboundSessionId::default(); - simulate_listener_connection(&mut behaviour, peer_id); - simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone()); validate_new_inbound_session_event(&mut behaviour, &peer_id, inbound_session_id, &QUERY).await; validate_no_events(&mut behaviour); @@ -319,9 +315,11 @@ async fn create_and_process_outbound_session() { let peer_id = PeerId::random(); - simulate_connection_established(&mut behaviour, peer_id); - let outbound_session_id = - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap(); + let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone()); + + validate_request_peer_assignment_event(&mut behaviour, outbound_session_id).await; + validate_no_events(&mut behaviour); + simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id); validate_create_outbound_session_event(&mut behaviour, &peer_id, &QUERY, &outbound_session_id) .await; @@ -351,17 +349,17 @@ async fn connection_closed() { let peer_id = PeerId::random(); - simulate_connection_established(&mut behaviour, peer_id); - - let outbound_session_id = - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap(); - + // Add an outbound session on the connection. + let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone()); + // Consume the event to request peer assignment. + behaviour.next().await.unwrap(); + simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id); // Consume the event to create an outbound session. behaviour.next().await.unwrap(); + // Add an inbound session on the connection. let inbound_session_id = InboundSessionId::default(); simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone()); - // Consume the event to notify the user about the new inbound session. behaviour.next().await.unwrap(); @@ -399,11 +397,10 @@ async fn drop_outbound_session() { let peer_id = PeerId::random(); - simulate_connection_established(&mut behaviour, peer_id); - - let outbound_session_id = - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap(); - + let outbound_session_id = behaviour.start_query(QUERY.clone(), PROTOCOL_NAME.clone()); + // Consume the event to request peer assignment. + behaviour.next().await.unwrap(); + simulate_peer_assigned(&mut behaviour, peer_id, outbound_session_id); // Consume the event to create an outbound session. behaviour.next().await.unwrap(); @@ -433,8 +430,6 @@ async fn drop_inbound_session() { let peer_id = PeerId::random(); let inbound_session_id = InboundSessionId::default(); - simulate_listener_connection(&mut behaviour, peer_id); - simulate_new_inbound_session(&mut behaviour, peer_id, inbound_session_id, QUERY.clone()); // Consume the event that a new inbound session was created. @@ -466,12 +461,3 @@ fn send_response_non_existing_session_fails() { behaviour.send_response(response, InboundSessionId::default()).unwrap_err(); } } - -#[test] -fn send_query_peer_not_connected_fails() { - let mut behaviour = Behaviour::new(Config::get_test_config()); - - let peer_id = PeerId::random(); - - behaviour.send_query(QUERY.clone(), peer_id, PROTOCOL_NAME.clone()).unwrap_err(); -} diff --git a/crates/papyrus_network/src/sqmr/flow_test.rs b/crates/papyrus_network/src/sqmr/flow_test.rs index 820ab8cfe6..619bdafeb5 100644 --- a/crates/papyrus_network/src/sqmr/flow_test.rs +++ b/crates/papyrus_network/src/sqmr/flow_test.rs @@ -5,13 +5,15 @@ use std::time::Duration; use defaultmap::DefaultHashMap; use futures::StreamExt; -use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; +use libp2p::swarm::{ConnectionId, NetworkBehaviour, SwarmEvent}; use libp2p::{PeerId, StreamProtocol, Swarm}; -use super::behaviour::{Behaviour, Event, ExternalEvent}; +use super::behaviour::{Behaviour, Event, ExternalEvent, ToOtherBehaviourEvent}; use super::{Bytes, Config, InboundSessionId, OutboundSessionId, SessionId}; +use crate::mixed_behaviour::BridgedBehaviour; use crate::test_utils::create_fully_connected_swarms_stream; use crate::utils::StreamHashMap; +use crate::{mixed_behaviour, peer_manager}; const NUM_PEERS: usize = 3; const NUM_MESSAGES_PER_SESSION: usize = 5; @@ -59,23 +61,36 @@ fn perform_action_on_swarms( } } -fn send_query_and_update_map( +fn start_query_and_update_map( outbound_swarm: &mut Swarm, inbound_peer_id: PeerId, outbound_session_id_to_peer_id: &mut HashMap<(PeerId, OutboundSessionId), PeerId>, ) { let outbound_peer_id = *outbound_swarm.local_peer_id(); - let outbound_session_id = outbound_swarm - .behaviour_mut() - .send_query( - get_bytes_from_query_indices(outbound_peer_id, inbound_peer_id), - inbound_peer_id, - PROTOCOL_NAME, - ) - .unwrap(); + let outbound_session_id = outbound_swarm.behaviour_mut().start_query( + get_bytes_from_query_indices(outbound_peer_id, inbound_peer_id), + PROTOCOL_NAME, + ); outbound_session_id_to_peer_id.insert((outbound_peer_id, outbound_session_id), inbound_peer_id); } +fn assign_peer_to_outbound_session( + outbound_swarm: &mut Swarm, + inbound_peer_id: PeerId, + outbound_session_id: OutboundSessionId, + connection_id: ConnectionId, +) { + outbound_swarm.behaviour_mut().on_other_behaviour_event( + &mixed_behaviour::ToOtherBehaviourEvent::PeerManager( + peer_manager::ToOtherBehaviourEvent::SessionAssigned { + outbound_session_id, + peer_id: inbound_peer_id, + connection_id, + }, + ), + ); +} + fn send_response( inbound_swarm: &mut Swarm, outbound_peer_id: PeerId, @@ -105,6 +120,25 @@ fn close_inbound_session( .unwrap(); } +fn check_request_peer_assignment_event_and_return_session_id( + outbound_peer_id: PeerId, + swarm_event: SwarmEventAlias, + outbound_session_id_to_peer_id: &HashMap<(PeerId, OutboundSessionId), PeerId>, +) -> Option<(PeerId, OutboundSessionId)> { + let SwarmEvent::Behaviour(event) = swarm_event else { + return None; + }; + let Event::ToOtherBehaviourEvent(ToOtherBehaviourEvent::RequestPeerAssignment { + outbound_session_id, + }) = event + else { + panic!("Got unexpected event {:?} when expecting RequestPeerAssignment", event); + }; + let assigned_peer_id = + *outbound_session_id_to_peer_id.get(&(outbound_peer_id, outbound_session_id)).unwrap(); + Some((assigned_peer_id, outbound_session_id)) +} + fn check_new_inbound_session_event_and_return_id( inbound_peer_id: PeerId, swarm_event: SwarmEventAlias, @@ -188,15 +222,16 @@ fn get_response_from_indices(peer_id1: PeerId, peer_id2: PeerId, message_index: #[tokio::test] async fn everyone_sends_to_everyone() { - let mut swarms_stream = create_fully_connected_swarms_stream(NUM_PEERS, || { - let mut behaviour = Behaviour::new(Config { session_timeout: Duration::from_secs(5) }); - let supported_inbound_protocols = vec![PROTOCOL_NAME, OTHER_PROTOCOL_NAME]; - for protocol in supported_inbound_protocols { - behaviour.add_new_supported_inbound_protocol(protocol); - } - behaviour - }) - .await; + let (mut swarms_stream, connection_ids) = + create_fully_connected_swarms_stream(NUM_PEERS, || { + let mut behaviour = Behaviour::new(Config { session_timeout: Duration::from_secs(5) }); + let supported_inbound_protocols = vec![PROTOCOL_NAME, OTHER_PROTOCOL_NAME]; + for protocol in supported_inbound_protocols { + behaviour.add_new_supported_inbound_protocol(protocol); + } + behaviour + }) + .await; let peer_ids = swarms_stream.keys().copied().collect::>(); @@ -205,7 +240,7 @@ async fn everyone_sends_to_everyone() { &mut swarms_stream, &peer_ids, &mut |outbound_swarm, inbound_peer_id| { - send_query_and_update_map( + start_query_and_update_map( outbound_swarm, inbound_peer_id, &mut outbound_session_id_to_peer_id, @@ -213,6 +248,35 @@ async fn everyone_sends_to_everyone() { }, ); + let peers_to_outbound_session_id = collect_events_from_swarms( + &mut swarms_stream, + |peer_id, event| { + check_request_peer_assignment_event_and_return_session_id( + peer_id, + event, + &outbound_session_id_to_peer_id, + ) + }, + true, + ) + .await; + perform_action_on_swarms( + &mut swarms_stream, + &peer_ids, + &mut |outbound_swarm, inbound_peer_id| { + let outbound_peer_id = *outbound_swarm.local_peer_id(); + let outbound_session_id = + *peers_to_outbound_session_id.get(&(outbound_peer_id, inbound_peer_id)).unwrap(); + let connection_id = *connection_ids.get(&(outbound_peer_id, inbound_peer_id)).unwrap(); + assign_peer_to_outbound_session( + outbound_swarm, + inbound_peer_id, + outbound_session_id, + connection_id, + ) + }, + ); + let inbound_session_ids = collect_events_from_swarms( &mut swarms_stream, check_new_inbound_session_event_and_return_id, diff --git a/crates/papyrus_network/src/test_utils/mod.rs b/crates/papyrus_network/src/test_utils/mod.rs index fc54f66823..9afbb35347 100644 --- a/crates/papyrus_network/src/test_utils/mod.rs +++ b/crates/papyrus_network/src/test_utils/mod.rs @@ -1,15 +1,16 @@ mod get_stream; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::pin::Pin; use std::task::{ready, Context, Poll}; use std::time::Duration; -use futures::future::Future; +use futures::future::{Either, Future}; use futures::pin_mut; use futures::stream::Stream as StreamTrait; -use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::swarm::dial_opts::{DialOpts, PeerCondition}; +use libp2p::swarm::{ConnectionId, NetworkBehaviour, Swarm, SwarmEvent}; use libp2p::{PeerId, Stream, StreamProtocol}; use libp2p_swarm_test::SwarmExt; use tokio::sync::Mutex; @@ -60,11 +61,11 @@ impl crate::sqmr::handler::Handler { } /// Create num_swarms swarms and connect each pair of swarms. Return them as a combined stream of -/// events. +/// events. Also return all the connection ids of the created connections pub(crate) async fn create_fully_connected_swarms_stream( num_swarms: usize, behaviour_gen: impl Fn() -> TBehaviour, -) -> StreamHashMap> +) -> (StreamHashMap>, HashMap<(PeerId, PeerId), ConnectionId>) where ::ToSwarm: Debug, { @@ -75,15 +76,77 @@ where swarm.listen().with_memory_addr_external().await; } + let mut connection_ids = HashMap::new(); + for i in 0..(swarms.len() - 1) { let (swarms1, swarms2) = swarms.split_at_mut(i + 1); let swarm1 = &mut swarms1[i]; + let peer_id1 = *swarm1.local_peer_id(); for swarm2 in swarms2 { - swarm1.connect(swarm2).await; + let (connection_id1, connection_id2) = connect_swarms(swarm1, swarm2).await; + let peer_id2 = *swarm2.local_peer_id(); + connection_ids.insert((peer_id1, peer_id2), connection_id1); + connection_ids.insert((peer_id2, peer_id1), connection_id2); } } - StreamHashMap::new(swarms.into_iter().map(|swarm| (*swarm.local_peer_id(), swarm)).collect()) + ( + StreamHashMap::new( + swarms.into_iter().map(|swarm| (*swarm.local_peer_id(), swarm)).collect(), + ), + connection_ids, + ) +} + +// Copied from SwarmExt::connect, but this function returns the connection id. +/// Connect two swarms and return the connection id that each swarm gave to this connection. +async fn connect_swarms( + swarm1: &mut Swarm, + swarm2: &mut Swarm, +) -> (ConnectionId, ConnectionId) +where + ::ToSwarm: Debug, +{ + let external_addresses = swarm2.external_addresses().cloned().collect(); + + let dial_opts = DialOpts::peer_id(*swarm2.local_peer_id()) + .addresses(external_addresses) + .condition(PeerCondition::Always) + .build(); + + swarm1.dial(dial_opts).unwrap(); + + let mut dialer_connection_id = None; + let mut listener_connection_id = None; + + loop { + match futures::future::select(swarm1.next_swarm_event(), swarm2.next_swarm_event()).await { + Either::Left((SwarmEvent::ConnectionEstablished { connection_id, .. }, _)) => { + dialer_connection_id = Some(connection_id); + } + Either::Right((SwarmEvent::ConnectionEstablished { connection_id, .. }, _)) => { + listener_connection_id = Some(connection_id); + } + Either::Left((swarm2, _)) => { + tracing::debug!( + dialer=?swarm2, + "Ignoring event from dialer" + ); + } + Either::Right((swarm2, _)) => { + tracing::debug!( + listener=?swarm2, + "Ignoring event from listener" + ); + } + } + + if let Some((dialer_connection_id, listener_connection_id)) = + dialer_connection_id.zip(listener_connection_id) + { + return (dialer_connection_id, listener_connection_id); + } + } } // I tried making this generic on the async function we run, but it caused a lot of lifetime