Skip to content

Commit

Permalink
chore(network): create docstrings for network manager
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Jul 24, 2024
1 parent 3bcfcc6 commit a5d3ac0
Showing 1 changed file with 145 additions and 4 deletions.
149 changes: 145 additions & 4 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,35 @@ pub struct GenericNetworkManager<SwarmT: SwarmTrait> {
}

impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
/// Starts processing network events. This function runs indefinitely, handling connections,
/// disconnections, messages, and other network-related events as they occur.
///
/// The function processes events from the `swarm`, handles incoming and outgoing SQMR messages,
/// and manages broadcasts and peer reports. It utilizes `tokio::select!` to concurrently
/// handle multiple asynchronous streams of network events.
///
/// # Examples
/// ```rust
/// let config = NetworkConfig {
/// tcp_port: 3030,
/// quic_port: 3031,
/// session_timeout: 3600,
/// idle_connection_timeout: 600,
/// bootstrap_peer_multiaddr: "/ip4/192.168.1.1/tcp/3030".to_string(),
/// secret_key: "your_secret_key_here".to_string(),
/// };
///
/// let manager = NetworkManager::new(config);
///
/// tokio::spawn(async move {
/// if let Err(e) = manager.run().await {
/// eprintln!("Network manager stopped with an error: {:?}", e);
/// }
/// });
/// ```
///
/// This function will not return under normal operations and should be run within an async
/// context, such as a Tokio runtime.
pub async fn run(mut self) -> Result<(), NetworkError> {
loop {
tokio::select! {
Expand Down Expand Up @@ -91,6 +120,43 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}

/// Registers a server-side protocol for handling SQMR queries with a specified buffer size.
/// This function will panic if the protocol is already registered.
///
/// The `SqmrServerReceiver` returned by this function can be used in an asynchronous loop
/// to process incoming queries.
///
/// # Parameters
/// - `protocol`: Identifier for the protocol.
/// - `buffer_size`: The size of the internal message buffer.
///
/// # Returns
/// A `SqmrServerReceiver` that can be used to receive and handle incoming queries.
///
/// # Examples
/// ```rust
/// use futures::StreamExt;
///
/// let mut manager = setup_network_manager(); // setup_network_manager is a hypothetical helper function
/// let receiver = manager.register_sqmr_protocol_server("my_protocol_v1", 1024);
///
/// tokio::spawn(async move {
/// while let Some(payload) = receiver.next().await {
/// match payload.query {
/// Ok(query) => {
/// println!("Received query: {:?}", query);
/// // Process query and send response
/// let response = "Response data".as_bytes().to_vec();
/// payload.responses_sender.send(Bytes::from(response)).await.expect("Failed to send response");
/// },
/// Err(e) => {
/// eprintln!("Error parsing query: {:?}", e);
/// payload.report_sender.send(()).expect("Failed to send report");
/// }
/// }
/// }
/// });
/// ```
/// TODO: Support multiple protocols where they're all different versions of the same protocol
pub fn register_sqmr_protocol_server<Query, Response>(
&mut self,
Expand Down Expand Up @@ -124,9 +190,47 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
})
}

/// Registers a client-side SQMR protocol with a buffer for outgoing messages.
/// This function will panic if the protocol is already registered.
///
/// The `SqmrClientSender` returned by this function is used to send queries and asynchronously
/// handle responses.
///
/// # Parameters
/// - `protocol`: Identifier for the protocol.
/// - `buffer_size`: The size of the internal message buffer.
///
/// # Returns
/// A `SqmrClientSender` that can be used to send queries and handle responses.
///
/// # Examples
/// ```rust
/// use futures::SinkExt;
///
/// let mut manager = setup_network_manager(); // Assume setup_network_manager returns a configured manager
/// let sender = manager.register_sqmr_protocol_client("my_protocol_v1", 1024);
///
/// tokio::spawn(async move {
/// let query = Bytes::from("data query".as_bytes());
/// let (response_sender, response_receiver) = futures::channel::mpsc::channel(10);
/// let (report_sender, report_receiver) = futures::channel::oneshot::channel();
///
/// let payload = SqmrClientPayload {
/// query,
/// report_receiver,
/// responses_sender: response_sender.sink_map_err(|e| e.to_string()),
/// };
/// sender.send(payload).await.expect("Failed to send query");
///
/// // Optionally, handle responses
/// tokio::spawn(async move {
/// while let Some(response) = response_receiver.next().await {
/// println!("Received response: {:?}", response);
/// }
/// });
/// });
/// ```
/// TODO: Support multiple protocols where they're all different versions of the same protocol
/// Register a new subscriber for sending a single query and receiving multiple responses.
/// Panics if the given protocol is already subscribed.
pub fn register_sqmr_protocol_client<Query, Response>(
&mut self,
protocol: String,
Expand Down Expand Up @@ -160,8 +264,45 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
Box::new(payload_sender)
}

/// Register a new subscriber for broadcasting and receiving broadcasts for a given topic.
/// Panics if this topic is already subscribed.
/// Registers a topic for broadcasting messages across the network.
/// Panics if the topic is already registered.
///
/// The `BroadcastSubscriberChannels` structure contains senders and receivers for managing
/// broadcast messages.
///
/// # Parameters
/// - `topic`: The topic to subscribe to and broadcast messages.
/// - `buffer_size`: Buffer size for the broadcast channel.
///
/// # Returns
/// A struct containing channels for sending and receiving broadcast messages.
///
/// # Examples
/// ```rust
/// use futures::{SinkExt, StreamExt};
///
/// let mut manager = setup_network_manager();
/// let topic = Topic::new("news_updates");
/// let channels = manager.register_broadcast_topic(topic, 1024).unwrap();
///
/// // Sending a message to the topic
/// tokio::spawn(async move {
/// let message = Bytes::from("Hello, world!");
/// channels.messages_to_broadcast_sender.send(message).await.expect("Failed to send message");
/// });
///
/// // Receiving a message from the topic
/// tokio::spawn(async move {
/// while let Some((result, _report_sender)) =
/// channels.broadcasted_messages_receiver.next().await
/// {
/// match result {
/// Ok(message) => println!("Received broadcast: {:?}", message),
/// Err(e) => eprintln!("Error receiving broadcast: {:?}", e),
/// }
/// }
/// });
/// ```
pub fn register_broadcast_topic<T>(
&mut self,
topic: Topic,
Expand Down

0 comments on commit a5d3ac0

Please sign in to comment.