Skip to content

Commit

Permalink
fix(consensus): make sure StreamHandler sends receiver on message zero
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 23, 2024
1 parent 1af62a9 commit 72b34b3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
18 changes: 13 additions & 5 deletions crates/sequencing/papyrus_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,22 @@ struct StreamData<
// Last message ID. If None, it means we have not yet gotten to it.
fin_message_id: Option<MessageId>,
max_message_id_received: MessageId,
// Keep the receiver until it is time to send it to the application.
receiver: Option<mpsc::Receiver<T>>,
sender: mpsc::Sender<T>,
// A buffer for messages that were received out of order.
message_buffer: HashMap<MessageId, StreamMessage<T>>,
}

impl<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> StreamData<T> {
fn new(sender: mpsc::Sender<T>) -> Self {
fn new() -> Self {
let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH);
StreamData {
next_message_id: 0,
fin_message_id: None,
max_message_id_received: 0,
sender,
receiver: Some(receiver),
message_buffer: HashMap::new(),
}
}
Expand Down Expand Up @@ -187,6 +191,13 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
// TODO(guyn): reconsider the "expect" here.
let sender = &mut data.sender;
if let StreamMessageBody::Content(content) = message.message {
if message.message_id == 0 {
// TODO(guyn): consider the expect in both cases.
// If this is the first message, send the receiver to the application.
let receiver = data.receiver.take().expect("Receiver should exist");
// Send the receiver to the application.
self.inbound_channel_sender.try_send(receiver).expect("Send should succeed");
}
match sender.try_send(content) {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -265,10 +276,7 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
Vacant(_) => {
// If we received a message for a stream that we have not seen before,
// we need to create a new receiver for it.
let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH);
// TODO(guyn): reconsider the "expect" here.
self.inbound_channel_sender.try_send(receiver).expect("Send should succeed");
StreamData::new(sender)
StreamData::new()
}
};
if let Some(data) = self.handle_message_inner(message, metadata, data) {
Expand Down
47 changes: 26 additions & 21 deletions crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,24 @@ mod tests {
});
let mut stream_handler = join_handle.await.expect("Task should succeed");

// Get the receiver for the stream.
let mut receiver = inbound_channel_receiver.next().await.unwrap();
// Check that the channel is empty (no messages were sent yet).
assert!(receiver.try_next().is_err());
// No receiver should be created yet.
assert!(inbound_channel_receiver.try_next().is_err());

assert_eq!(stream_handler.inbound_stream_data.len(), 1);
assert_eq!(
stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id)].message_buffer.len(),
5
);
// Still waiting for message 0.
assert_eq!(
stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id)].next_message_id,
0
);
// Has a receiver, waiting to be sent when message 0 is received.
assert!(
stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id)].receiver.is_some()
);

let range: Vec<u64> = (1..6).collect();
let keys: Vec<u64> = stream_handler.inbound_stream_data[&(peer_id, stream_id)]
.message_buffer
Expand All @@ -202,6 +210,9 @@ mod tests {
let stream_handler = join_handle.await.expect("Task should succeed");
assert!(stream_handler.inbound_stream_data.is_empty());

// Get the receiver for the stream.
let mut receiver = inbound_channel_receiver.next().await.unwrap();

for _ in 0..5 {
// message number 5 is Fin, so it will not be sent!
let _ = receiver.next().await.unwrap();
Expand Down Expand Up @@ -286,23 +297,8 @@ mod tests {
&(1..10).collect::<Vec<_>>()
));

// Get the receiver for the first stream.
let mut receiver1 = inbound_channel_receiver.next().await.unwrap();

// Check that the channel is empty (no messages were sent yet).
assert!(receiver1.try_next().is_err());

// Get the receiver for the second stream.
let mut receiver2 = inbound_channel_receiver.next().await.unwrap();

// Check that the channel is empty (no messages were sent yet).
assert!(receiver2.try_next().is_err());

// Get the receiver for the third stream.
let mut receiver3 = inbound_channel_receiver.next().await.unwrap();

// Check that the channel is empty (no messages were sent yet).
assert!(receiver3.try_next().is_err());
// None of the streams should have emitted a receiver yet.
assert!(inbound_channel_receiver.try_next().is_err());

// Send the last message on stream_id1:
send(&mut network_sender, &inbound_metadata, make_test_message(stream_id1, 0, false)).await;
Expand All @@ -313,6 +309,9 @@ mod tests {
stream_handler
});

// Get the receiver for the first stream.
let mut receiver1 = inbound_channel_receiver.next().await.unwrap();

// Should be able to read all the messages for stream_id1.
for _ in 0..9 {
// message number 9 is Fin, so it will not be sent!
Expand All @@ -333,6 +332,9 @@ mod tests {
stream_handler
});

// Get the receiver for the second stream.
let mut receiver2 = inbound_channel_receiver.next().await.unwrap();

// Should be able to read all the messages for stream_id2.
for _ in 0..5 {
// message number 5 is Fin, so it will not be sent!
Expand All @@ -354,6 +356,9 @@ mod tests {
stream_handler
});

// Get the receiver for the third stream.
let mut receiver3 = inbound_channel_receiver.next().await.unwrap();

let stream_handler = join_handle.await.expect("Task should succeed");
for _ in 0..10 {
// All messages are received, including number 9 which is not Fin
Expand Down

0 comments on commit 72b34b3

Please sign in to comment.