From 82a8181fd5414f47a4e795cf6a1caa104037810b Mon Sep 17 00:00:00 2001 From: guy-starkware Date: Tue, 26 Nov 2024 00:52:27 -0800 Subject: [PATCH] feat: add a helper function to StreamHandler to set up network channels (#2184) --- .../papyrus_consensus/src/stream_handler.rs | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler.rs b/crates/sequencing/papyrus_consensus/src/stream_handler.rs index 0531ba48a6..aef64421c7 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler.rs @@ -30,7 +30,9 @@ type StreamKey = (PeerId, StreamId); const CHANNEL_BUFFER_LENGTH: usize = 100; #[derive(Debug, Clone)] -struct StreamData> + TryFrom, Error = ProtobufConversionError>> { +struct StreamData< + T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, +> { next_message_id: MessageId, // Last message ID. If None, it means we have not yet gotten to it. fin_message_id: Option, @@ -56,7 +58,7 @@ impl> + TryFrom, Error = ProtobufConversionError /// - Buffering inbound messages and reporting them to the application in order. /// - Sending outbound messages to the network, wrapped in StreamMessage. pub struct StreamHandler< - T: Clone + Into> + TryFrom, Error = ProtobufConversionError>, + T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, > { // For each stream ID from the network, send the application a Receiver // that will receive the messages in order. This allows sending such Receivers. @@ -100,6 +102,46 @@ impl> + TryFrom, Error = ProtobufConversi } } + /// Create a new StreamHandler and start it running in a new task. + /// Gets network input/output channels and returns application input/output channels. + #[allow(clippy::type_complexity)] + pub fn get_channels( + inbound_network_receiver: BroadcastTopicServer>, + outbound_network_sender: BroadcastTopicClient>, + ) -> ( + mpsc::Sender<(StreamId, mpsc::Receiver)>, + mpsc::Receiver>, + tokio::task::JoinHandle<()>, + ) { + // The inbound messages come into StreamHandler via inbound_network_receiver. + // The application gets the messages from inbound_internal_receiver + // (the StreamHandler keeps the inbound_internal_sender to pass the messages). + let (inbound_internal_sender, inbound_internal_receiver): ( + mpsc::Sender>, + mpsc::Receiver>, + ) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + // The outbound messages that an application would like to send are: + // 1. Sent into outbound_internal_sender as tuples of (StreamId, Receiver) + // 2. Ingested by StreamHandler by its outbound_internal_receiver. + // 3. Broadcast by the StreamHandler using its outbound_network_sender. + let (outbound_internal_sender, outbound_internal_receiver): ( + mpsc::Sender<(StreamId, mpsc::Receiver)>, + mpsc::Receiver<(StreamId, mpsc::Receiver)>, + ) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + + let mut stream_handler = StreamHandler::::new( + inbound_internal_sender, // Sender>, + inbound_network_receiver, // BroadcastTopicServer>, + outbound_internal_receiver, // Receiver<(StreamId, Receiver)>, + outbound_network_sender, // BroadcastTopicClient> + ); + let handle = tokio::spawn(async move { + stream_handler.run().await; + }); + + (outbound_internal_sender, inbound_internal_receiver, handle) + } + /// Listen for messages coming from the network and from the application. /// - Outbound messages are wrapped as StreamMessage and sent to the network directly. /// - Inbound messages are stripped of StreamMessage and buffered until they can be sent in the