Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consensus): dont fail when stream handler inbound receiver drops #2730

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions crates/sequencing/papyrus_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type StreamKey = (PeerId, StreamId);

const CHANNEL_BUFFER_LENGTH: usize = 100;

// Use this struct for each inbound stream.
// Drop the struct when:
// (1) receiver on the other end is dropped,
// (2) fin message is received and all messages are sent.
#[derive(Debug, Clone)]
struct StreamData<
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + 'static,
Expand Down Expand Up @@ -178,13 +182,38 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
}
}

fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) {
// Returns true if the receiver for this stream is dropped.
fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) -> bool {
// TODO(guyn): reconsider the "expect" here.
let sender = &mut data.sender;
if let StreamMessageBody::Content(content) = message.message {
sender.try_send(content).expect("Send should succeed");
match sender.try_send(content) {
Ok(_) => {}
Err(e) => {
if e.is_disconnected() {
warn!(
"Sender is disconnected, dropping the message. StreamId: {}, \
MessageId: {}",
message.stream_id, message.message_id
);
return true;
} else if e.is_full() {
// TODO(guyn): replace panic with buffering of the message.
panic!(
"Sender is full, dropping the message. StreamId: {}, MessageId: {}",
message.stream_id, message.message_id
);
} else {
// TODO(guyn): replace panic with more graceful error handling
panic!("Unexpected error: {:?}", e);
}
}
};
data.next_message_id += 1;
return false;
}
// A Fin message is not sent. This is a no-op, can safely return true.
true
}

// Send the message to the network.
Expand Down Expand Up @@ -225,6 +254,7 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
return;
}
};

let peer_id = metadata.originator_id;
let stream_id = message.stream_id;
let key = (peer_id, stream_id);
Expand Down Expand Up @@ -280,10 +310,14 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
// This means we can just send the message without buffering it.
match message_id.cmp(&data.next_message_id) {
Ordering::Equal => {
Self::inbound_send(data, message);
Self::process_buffer(data);
let mut receiver_dropped = Self::inbound_send(data, message);
if !receiver_dropped {
receiver_dropped = Self::process_buffer(data);
}

if data.message_buffer.is_empty() && data.fin_message_id.is_some() {
if data.message_buffer.is_empty() && data.fin_message_id.is_some()
|| receiver_dropped
{
data.sender.close_channel();
self.inbound_stream_data.remove(&key);
}
Expand Down Expand Up @@ -323,9 +357,13 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi

// Tries to drain as many messages as possible from the buffer (in order),
// DOES NOT guarantee that the buffer will be empty after calling this function.
fn process_buffer(data: &mut StreamData<T>) {
// Returns true if the receiver for this stream is dropped.
fn process_buffer(data: &mut StreamData<T>) -> bool {
while let Some(message) = data.message_buffer.remove(&data.next_message_id) {
Self::inbound_send(data, message);
if Self::inbound_send(data, message) {
return true;
}
}
false
}
}
52 changes: 52 additions & 0 deletions crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,58 @@ mod tests {
);
}

#[tokio::test]
async fn inbound_close_channel() {
let (mut stream_handler, mut network_sender, mut inbound_channel_receiver, metadata, _, _) =
setup_test();

let stream_id = 127;
// Send two messages, no Fin.
for i in 0..2 {
let message = make_test_message(stream_id, i, false);
send(&mut network_sender, &metadata, message).await;
}

// Allow the StreamHandler to process the messages.
let join_handle = tokio::spawn(async move {
let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await;
stream_handler
});
let mut stream_handler = join_handle.await.expect("Task should succeed");

let mut receiver = inbound_channel_receiver.next().await.unwrap();
for _ in 0..2 {
let _ = receiver.next().await.unwrap();
}

// Check that the stream handler contains the StreamData.
assert_eq!(stream_handler.inbound_stream_data.len(), 1);
assert_eq!(
stream_handler.inbound_stream_data.keys().next().unwrap(),
&(metadata.originator_id.clone(), stream_id)
);

// Close the channel.
drop(receiver);

// Send more messages.
// TODO(guyn): if we set this to 2..4 it fails... the last message opens a new StreamData!
for i in 2..3 {
let message = make_test_message(stream_id, i, false);
send(&mut network_sender, &metadata, message).await;
}

// Allow the StreamHandler to process the messages.
let join_handle = tokio::spawn(async move {
let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await;
stream_handler
});
let stream_handler = join_handle.await.expect("Task should succeed");

// Check that the stream handler no longer contains the StreamData.
assert_eq!(stream_handler.inbound_stream_data.len(), 0);
}

// This test does two things:
// 1. Opens two outbound channels and checks that messages get correctly sent on both.
// 2. Closes the first channel and checks that Fin is sent and that the relevant structures
Expand Down
Loading