From be4b2b6aee509bbd99547d05bd01046ff731453a Mon Sep 17 00:00:00 2001 From: Guy Nir Date: Wed, 18 Dec 2024 11:48:37 +0200 Subject: [PATCH] fix(consensus): make sure StreamHandler sends receiver on message zero --- .../papyrus_consensus/src/stream_handler.rs | 21 +++++++-- .../src/stream_handler_test.rs | 47 ++++++++++--------- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler.rs b/crates/sequencing/papyrus_consensus/src/stream_handler.rs index c9af23234a..bac8b6a999 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler.rs @@ -41,17 +41,21 @@ struct StreamData< fin_message_id: Option, max_message_id_received: MessageId, sender: mpsc::Sender, + // Keep the receiver until it is time to send it to the application. + receiver: Option>, // A buffer for messages that were received out of order. message_buffer: HashMap>, } impl> + TryFrom, Error = ProtobufConversionError>> StreamData { - fn new(sender: mpsc::Sender) -> Self { + fn new() -> Self { + let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH); StreamData { next_message_id: 0, fin_message_id: None, max_message_id_received: 0, sender, + receiver: Some(receiver), message_buffer: HashMap::new(), } } @@ -187,6 +191,13 @@ impl> + TryFrom, Error = ProtobufConversi // TODO(guyn): reconsider the "expect" here. let sender = &mut data.sender; if let StreamMessageBody::Content(content) = message.message { + if message.message_id == 0 { + // TODO(guyn): consider the expect in both cases. + // If this is the first message, send the receiver to the application. + let receiver = data.receiver.take().expect("Receiver should exist"); + // Send the receiver to the application. + self.inbound_channel_sender.try_send(receiver).expect("Send should succeed"); + } match sender.try_send(content) { Ok(_) => {} Err(e) => { @@ -266,10 +277,10 @@ impl> + TryFrom, Error = ProtobufConversi Vacant(_) => { // If we received a message for a stream that we have not seen before, // we need to create a new receiver for it. - let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH); - // TODO(guyn): reconsider the "expect" here. - self.inbound_channel_sender.try_send(receiver).expect("Send should succeed"); - StreamData::new(sender) + // let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH); + // // TODO(guyn): reconsider the "expect" here. + // self.inbound_channel_sender.try_send(receiver).expect("Send should succeed"); + StreamData::new() } }; diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs index c6f3a83e74..10f7935b77 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs @@ -173,16 +173,24 @@ mod tests { }); let mut stream_handler = join_handle.await.expect("Task should succeed"); - // Get the receiver for the stream. - let mut receiver = inbound_channel_receiver.next().await.unwrap(); - // Check that the channel is empty (no messages were sent yet). - assert!(receiver.try_next().is_err()); + // No receiver should be created yet. + assert!(inbound_channel_receiver.try_next().is_err()); assert_eq!(stream_handler.inbound_stream_data.len(), 1); assert_eq!( stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id)].message_buffer.len(), 5 ); + // Still waiting for message 0. + assert_eq!( + stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id)].next_message_id, + 0 + ); + // Has a receiver, waiting to be sent when message 0 is received. + assert!( + stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id)].receiver.is_some() + ); + let range: Vec = (1..6).collect(); let keys: Vec = stream_handler.inbound_stream_data[&(peer_id, stream_id)] .message_buffer @@ -203,6 +211,9 @@ mod tests { let stream_handler = join_handle.await.expect("Task should succeed"); assert!(stream_handler.inbound_stream_data.is_empty()); + // Get the receiver for the stream. + let mut receiver = inbound_channel_receiver.next().await.unwrap(); + for _ in 0..5 { // message number 5 is Fin, so it will not be sent! let _ = receiver.next().await.unwrap(); @@ -287,23 +298,8 @@ mod tests { &(1..10).collect::>() )); - // Get the receiver for the first stream. - let mut receiver1 = inbound_channel_receiver.next().await.unwrap(); - - // Check that the channel is empty (no messages were sent yet). - assert!(receiver1.try_next().is_err()); - - // Get the receiver for the second stream. - let mut receiver2 = inbound_channel_receiver.next().await.unwrap(); - - // Check that the channel is empty (no messages were sent yet). - assert!(receiver2.try_next().is_err()); - - // Get the receiver for the third stream. - let mut receiver3 = inbound_channel_receiver.next().await.unwrap(); - - // Check that the channel is empty (no messages were sent yet). - assert!(receiver3.try_next().is_err()); + // None of the streams should have emitted a receiver yet. + assert!(inbound_channel_receiver.try_next().is_err()); // Send the last message on stream_id1: send(&mut network_sender, &inbound_metadata, make_test_message(stream_id1, 0, false)).await; @@ -314,6 +310,9 @@ mod tests { stream_handler }); + // Get the receiver for the first stream. + let mut receiver1 = inbound_channel_receiver.next().await.unwrap(); + // Should be able to read all the messages for stream_id1. for _ in 0..9 { // message number 9 is Fin, so it will not be sent! @@ -334,6 +333,9 @@ mod tests { stream_handler }); + // Get the receiver for the second stream. + let mut receiver2 = inbound_channel_receiver.next().await.unwrap(); + // Should be able to read all the messages for stream_id2. for _ in 0..5 { // message number 5 is Fin, so it will not be sent! @@ -355,6 +357,9 @@ mod tests { stream_handler }); + // Get the receiver for the first stream. + let mut receiver3 = inbound_channel_receiver.next().await.unwrap(); + let stream_handler = join_handle.await.expect("Task should succeed"); for _ in 0..10 { // All messages are received, including number 9 which is not Fin