Skip to content

Commit

Permalink
feat: add a helper function to StreamHandler to set up network channe…
Browse files Browse the repository at this point in the history
…ls (#2184)
  • Loading branch information
guy-starkware authored Nov 26, 2024
1 parent fcf5cce commit 82a8181
Showing 1 changed file with 44 additions and 2 deletions.
46 changes: 44 additions & 2 deletions crates/sequencing/papyrus_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type StreamKey = (PeerId, StreamId);
const CHANNEL_BUFFER_LENGTH: usize = 100;

#[derive(Debug, Clone)]
struct StreamData<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> {
struct StreamData<
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + 'static,
> {
next_message_id: MessageId,
// Last message ID. If None, it means we have not yet gotten to it.
fin_message_id: Option<MessageId>,
Expand All @@ -56,7 +58,7 @@ impl<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError
/// - Buffering inbound messages and reporting them to the application in order.
/// - Sending outbound messages to the network, wrapped in StreamMessage.
pub struct StreamHandler<
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>,
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + 'static,
> {
// For each stream ID from the network, send the application a Receiver
// that will receive the messages in order. This allows sending such Receivers.
Expand Down Expand Up @@ -100,6 +102,46 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
}
}

/// Create a new StreamHandler and start it running in a new task.
/// Gets network input/output channels and returns application input/output channels.
#[allow(clippy::type_complexity)]
pub fn get_channels(
inbound_network_receiver: BroadcastTopicServer<StreamMessage<T>>,
outbound_network_sender: BroadcastTopicClient<StreamMessage<T>>,
) -> (
mpsc::Sender<(StreamId, mpsc::Receiver<T>)>,
mpsc::Receiver<mpsc::Receiver<T>>,
tokio::task::JoinHandle<()>,
) {
// The inbound messages come into StreamHandler via inbound_network_receiver.
// The application gets the messages from inbound_internal_receiver
// (the StreamHandler keeps the inbound_internal_sender to pass the messages).
let (inbound_internal_sender, inbound_internal_receiver): (
mpsc::Sender<mpsc::Receiver<T>>,
mpsc::Receiver<mpsc::Receiver<T>>,
) = mpsc::channel(CHANNEL_BUFFER_LENGTH);
// The outbound messages that an application would like to send are:
// 1. Sent into outbound_internal_sender as tuples of (StreamId, Receiver)
// 2. Ingested by StreamHandler by its outbound_internal_receiver.
// 3. Broadcast by the StreamHandler using its outbound_network_sender.
let (outbound_internal_sender, outbound_internal_receiver): (
mpsc::Sender<(StreamId, mpsc::Receiver<T>)>,
mpsc::Receiver<(StreamId, mpsc::Receiver<T>)>,
) = mpsc::channel(CHANNEL_BUFFER_LENGTH);

let mut stream_handler = StreamHandler::<T>::new(
inbound_internal_sender, // Sender<Receiver<T>>,
inbound_network_receiver, // BroadcastTopicServer<StreamMessage<T>>,
outbound_internal_receiver, // Receiver<(StreamId, Receiver<T>)>,
outbound_network_sender, // BroadcastTopicClient<StreamMessage<T>>
);
let handle = tokio::spawn(async move {
stream_handler.run().await;
});

(outbound_internal_sender, inbound_internal_receiver, handle)
}

/// Listen for messages coming from the network and from the application.
/// - Outbound messages are wrapped as StreamMessage and sent to the network directly.
/// - Inbound messages are stripped of StreamMessage and buffered until they can be sent in the
Expand Down

0 comments on commit 82a8181

Please sign in to comment.