From 691a239cb2900d74b4fe13a98cfe2914caf87ee2 Mon Sep 17 00:00:00 2001 From: Guy Nir Date: Tue, 17 Dec 2024 15:40:22 +0200 Subject: [PATCH] fix(consensus): dont fail when stream handler inbound receiver drops --- .../papyrus_consensus/src/stream_handler.rs | 53 ++++++++++++++++--- .../src/stream_handler_test.rs | 52 ++++++++++++++++++ 2 files changed, 98 insertions(+), 7 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler.rs b/crates/sequencing/papyrus_consensus/src/stream_handler.rs index 7101579dce..13089770ee 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler.rs @@ -28,6 +28,10 @@ type StreamKey = (PeerId, StreamId); const CHANNEL_BUFFER_LENGTH: usize = 100; +// Use this struct for each inbound stream. +// Drop the struct when: +// (1) receiver on the other end is dropped, +// (2) fin message is received and all messages are sent. #[derive(Debug, Clone)] struct StreamData< T: Clone + Into> + TryFrom, Error = ProtobufConversionError> + 'static, @@ -178,13 +182,38 @@ impl> + TryFrom, Error = ProtobufConversi } } - fn inbound_send(data: &mut StreamData, message: StreamMessage) { + // Returns true if the receiver for this stream is dropped. + fn inbound_send(data: &mut StreamData, message: StreamMessage) -> bool { // TODO(guyn): reconsider the "expect" here. let sender = &mut data.sender; if let StreamMessageBody::Content(content) = message.message { - sender.try_send(content).expect("Send should succeed"); + match sender.try_send(content) { + Ok(_) => {} + Err(e) => { + if e.is_disconnected() { + warn!( + "Sender is disconnected, dropping the message. StreamId: {}, \ + MessageId: {}", + message.stream_id, message.message_id + ); + return true; + } else if e.is_full() { + // TODO(guyn): replace panic with buffering of the message. + panic!( + "Sender is full, dropping the message. StreamId: {}, MessageId: {}", + message.stream_id, message.message_id + ); + } else { + // TODO(guyn): replace panic with more graceful error handling + panic!("Unexpected error: {:?}", e); + } + } + }; data.next_message_id += 1; + return false; } + // A Fin message is not sent. This is a no-op, can safely return true. + true } // Send the message to the network. @@ -225,6 +254,7 @@ impl> + TryFrom, Error = ProtobufConversi return; } }; + let peer_id = metadata.originator_id; let stream_id = message.stream_id; let key = (peer_id, stream_id); @@ -280,10 +310,15 @@ impl> + TryFrom, Error = ProtobufConversi // This means we can just send the message without buffering it. match message_id.cmp(&data.next_message_id) { Ordering::Equal => { - Self::inbound_send(data, message); - Self::process_buffer(data); + // receiver_dropped will be true if the receiver for this stream is dropped. + let mut receiver_dropped = Self::inbound_send(data, message); + if !receiver_dropped { + receiver_dropped = Self::process_buffer(data); + } - if data.message_buffer.is_empty() && data.fin_message_id.is_some() { + if data.message_buffer.is_empty() && data.fin_message_id.is_some() + || receiver_dropped + { data.sender.close_channel(); self.inbound_stream_data.remove(&key); } @@ -323,9 +358,13 @@ impl> + TryFrom, Error = ProtobufConversi // Tries to drain as many messages as possible from the buffer (in order), // DOES NOT guarantee that the buffer will be empty after calling this function. - fn process_buffer(data: &mut StreamData) { + // Returns true if the receiver for this stream is dropped. + fn process_buffer(data: &mut StreamData) -> bool { while let Some(message) = data.message_buffer.remove(&data.next_message_id) { - Self::inbound_send(data, message); + if Self::inbound_send(data, message) { + return true; + } } + false } } diff --git a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs index 6e6e720405..46e1121892 100644 --- a/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs +++ b/crates/sequencing/papyrus_consensus/src/stream_handler_test.rs @@ -395,6 +395,58 @@ mod tests { ); } + #[tokio::test] + async fn inbound_close_channel() { + let (mut stream_handler, mut network_sender, mut inbound_channel_receiver, metadata, _, _) = + setup_test(); + + let stream_id = 127; + // Send two messages, no Fin. + for i in 0..2 { + let message = make_test_message(stream_id, i, false); + send(&mut network_sender, &metadata, message).await; + } + + // Allow the StreamHandler to process the messages. + let join_handle = tokio::spawn(async move { + let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await; + stream_handler + }); + let mut stream_handler = join_handle.await.expect("Task should succeed"); + + let mut receiver = inbound_channel_receiver.next().await.unwrap(); + for _ in 0..2 { + let _ = receiver.next().await.unwrap(); + } + + // Check that the stream handler contains the StreamData. + assert_eq!(stream_handler.inbound_stream_data.len(), 1); + assert_eq!( + stream_handler.inbound_stream_data.keys().next().unwrap(), + &(metadata.originator_id.clone(), stream_id) + ); + + // Close the channel. + drop(receiver); + + // Send more messages. + // TODO(guyn): if we set this to 2..4 it fails... the last message opens a new StreamData! + for i in 2..3 { + let message = make_test_message(stream_id, i, false); + send(&mut network_sender, &metadata, message).await; + } + + // Allow the StreamHandler to process the messages. + let join_handle = tokio::spawn(async move { + let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await; + stream_handler + }); + let stream_handler = join_handle.await.expect("Task should succeed"); + + // Check that the stream handler no longer contains the StreamData. + assert_eq!(stream_handler.inbound_stream_data.len(), 0); + } + // This test does two things: // 1. Opens two outbound channels and checks that messages get correctly sent on both. // 2. Closes the first channel and checks that Fin is sent and that the relevant structures