diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index 2e723469c3..4f70512a87 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -55,6 +55,35 @@ pub struct GenericNetworkManager { } impl GenericNetworkManager { + /// Starts processing network events. This function runs indefinitely, handling connections, + /// disconnections, messages, and other network-related events as they occur. + /// + /// The function processes events from the `swarm`, handles incoming and outgoing SQMR messages, + /// and manages broadcasts and peer reports. It utilizes `tokio::select!` to concurrently + /// handle multiple asynchronous streams of network events. + /// + /// # Examples + /// ```rust + /// let config = NetworkConfig { + /// tcp_port: 3030, + /// quic_port: 3031, + /// session_timeout: 3600, + /// idle_connection_timeout: 600, + /// bootstrap_peer_multiaddr: "/ip4/192.168.1.1/tcp/3030".to_string(), + /// secret_key: "your_secret_key_here".to_string(), + /// }; + /// + /// let manager = NetworkManager::new(config); + /// + /// tokio::spawn(async move { + /// if let Err(e) = manager.run().await { + /// eprintln!("Network manager stopped with an error: {:?}", e); + /// } + /// }); + /// ``` + /// + /// This function will not return under normal operations and should be run within an async + /// context, such as a Tokio runtime. pub async fn run(mut self) -> Result<(), NetworkError> { loop { tokio::select! { @@ -91,6 +120,43 @@ impl GenericNetworkManager { } } + /// Registers a server-side protocol for handling SQMR queries with a specified buffer size. + /// This function will panic if the protocol is already registered. + /// + /// The `SqmrServerReceiver` returned by this function can be used in an asynchronous loop + /// to process incoming queries. + /// + /// # Parameters + /// - `protocol`: Identifier for the protocol. + /// - `buffer_size`: The size of the internal message buffer. + /// + /// # Returns + /// A `SqmrServerReceiver` that can be used to receive and handle incoming queries. + /// + /// # Examples + /// ```rust + /// use futures::StreamExt; + /// + /// let mut manager = setup_network_manager(); // setup_network_manager is a hypothetical helper function + /// let receiver = manager.register_sqmr_protocol_server("my_protocol_v1", 1024); + /// + /// tokio::spawn(async move { + /// while let Some(payload) = receiver.next().await { + /// match payload.query { + /// Ok(query) => { + /// println!("Received query: {:?}", query); + /// // Process query and send response + /// let response = "Response data".as_bytes().to_vec(); + /// payload.responses_sender.send(Bytes::from(response)).await.expect("Failed to send response"); + /// }, + /// Err(e) => { + /// eprintln!("Error parsing query: {:?}", e); + /// payload.report_sender.send(()).expect("Failed to send report"); + /// } + /// } + /// } + /// }); + /// ``` /// TODO: Support multiple protocols where they're all different versions of the same protocol pub fn register_sqmr_protocol_server( &mut self, @@ -124,9 +190,47 @@ impl GenericNetworkManager { }) } + /// Registers a client-side SQMR protocol with a buffer for outgoing messages. + /// This function will panic if the protocol is already registered. + /// + /// The `SqmrClientSender` returned by this function is used to send queries and asynchronously + /// handle responses. + /// + /// # Parameters + /// - `protocol`: Identifier for the protocol. + /// - `buffer_size`: The size of the internal message buffer. + /// + /// # Returns + /// A `SqmrClientSender` that can be used to send queries and handle responses. + /// + /// # Examples + /// ```rust + /// use futures::SinkExt; + /// + /// let mut manager = setup_network_manager(); // Assume setup_network_manager returns a configured manager + /// let sender = manager.register_sqmr_protocol_client("my_protocol_v1", 1024); + /// + /// tokio::spawn(async move { + /// let query = Bytes::from("data query".as_bytes()); + /// let (response_sender, response_receiver) = futures::channel::mpsc::channel(10); + /// let (report_sender, report_receiver) = futures::channel::oneshot::channel(); + /// + /// let payload = SqmrClientPayload { + /// query, + /// report_receiver, + /// responses_sender: response_sender.sink_map_err(|e| e.to_string()), + /// }; + /// sender.send(payload).await.expect("Failed to send query"); + /// + /// // Optionally, handle responses + /// tokio::spawn(async move { + /// while let Some(response) = response_receiver.next().await { + /// println!("Received response: {:?}", response); + /// } + /// }); + /// }); + /// ``` /// TODO: Support multiple protocols where they're all different versions of the same protocol - /// Register a new subscriber for sending a single query and receiving multiple responses. - /// Panics if the given protocol is already subscribed. pub fn register_sqmr_protocol_client( &mut self, protocol: String, @@ -160,8 +264,45 @@ impl GenericNetworkManager { Box::new(payload_sender) } - /// Register a new subscriber for broadcasting and receiving broadcasts for a given topic. - /// Panics if this topic is already subscribed. + /// Registers a topic for broadcasting messages across the network. + /// Panics if the topic is already registered. + /// + /// The `BroadcastSubscriberChannels` structure contains senders and receivers for managing + /// broadcast messages. + /// + /// # Parameters + /// - `topic`: The topic to subscribe to and broadcast messages. + /// - `buffer_size`: Buffer size for the broadcast channel. + /// + /// # Returns + /// A struct containing channels for sending and receiving broadcast messages. + /// + /// # Examples + /// ```rust + /// use futures::{SinkExt, StreamExt}; + /// + /// let mut manager = setup_network_manager(); + /// let topic = Topic::new("news_updates"); + /// let channels = manager.register_broadcast_topic(topic, 1024).unwrap(); + /// + /// // Sending a message to the topic + /// tokio::spawn(async move { + /// let message = Bytes::from("Hello, world!"); + /// channels.messages_to_broadcast_sender.send(message).await.expect("Failed to send message"); + /// }); + /// + /// // Receiving a message from the topic + /// tokio::spawn(async move { + /// while let Some((result, _report_sender)) = + /// channels.broadcasted_messages_receiver.next().await + /// { + /// match result { + /// Ok(message) => println!("Received broadcast: {:?}", message), + /// Err(e) => eprintln!("Error receiving broadcast: {:?}", e), + /// } + /// } + /// }); + /// ``` pub fn register_broadcast_topic( &mut self, topic: Topic,