Skip to content

Commit

Permalink
fix(consensus): dont fail when stream handler inbound receiver drops
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 17, 2024
1 parent b568dc7 commit 691a239
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 7 deletions.
53 changes: 46 additions & 7 deletions crates/sequencing/papyrus_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type StreamKey = (PeerId, StreamId);

const CHANNEL_BUFFER_LENGTH: usize = 100;

// Use this struct for each inbound stream.
// Drop the struct when:
// (1) receiver on the other end is dropped,
// (2) fin message is received and all messages are sent.
#[derive(Debug, Clone)]
struct StreamData<
T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError> + 'static,
Expand Down Expand Up @@ -178,13 +182,38 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
}
}

fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) {
// Returns true if the receiver for this stream is dropped.
fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) -> bool {
// TODO(guyn): reconsider the "expect" here.
let sender = &mut data.sender;
if let StreamMessageBody::Content(content) = message.message {
sender.try_send(content).expect("Send should succeed");
match sender.try_send(content) {
Ok(_) => {}
Err(e) => {
if e.is_disconnected() {
warn!(
"Sender is disconnected, dropping the message. StreamId: {}, \
MessageId: {}",
message.stream_id, message.message_id
);
return true;
} else if e.is_full() {
// TODO(guyn): replace panic with buffering of the message.
panic!(
"Sender is full, dropping the message. StreamId: {}, MessageId: {}",
message.stream_id, message.message_id
);
} else {
// TODO(guyn): replace panic with more graceful error handling
panic!("Unexpected error: {:?}", e);
}
}
};
data.next_message_id += 1;
return false;
}
// A Fin message is not sent. This is a no-op, can safely return true.
true
}

// Send the message to the network.
Expand Down Expand Up @@ -225,6 +254,7 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
return;
}
};

let peer_id = metadata.originator_id;
let stream_id = message.stream_id;
let key = (peer_id, stream_id);
Expand Down Expand Up @@ -280,10 +310,15 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
// This means we can just send the message without buffering it.
match message_id.cmp(&data.next_message_id) {
Ordering::Equal => {
Self::inbound_send(data, message);
Self::process_buffer(data);
// receiver_dropped will be true if the receiver for this stream is dropped.
let mut receiver_dropped = Self::inbound_send(data, message);
if !receiver_dropped {
receiver_dropped = Self::process_buffer(data);
}

if data.message_buffer.is_empty() && data.fin_message_id.is_some() {
if data.message_buffer.is_empty() && data.fin_message_id.is_some()
|| receiver_dropped
{
data.sender.close_channel();
self.inbound_stream_data.remove(&key);
}
Expand Down Expand Up @@ -323,9 +358,13 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi

// Tries to drain as many messages as possible from the buffer (in order),
// DOES NOT guarantee that the buffer will be empty after calling this function.
fn process_buffer(data: &mut StreamData<T>) {
// Returns true if the receiver for this stream is dropped.
fn process_buffer(data: &mut StreamData<T>) -> bool {
while let Some(message) = data.message_buffer.remove(&data.next_message_id) {
Self::inbound_send(data, message);
if Self::inbound_send(data, message) {
return true;
}
}
false
}
}
52 changes: 52 additions & 0 deletions crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,58 @@ mod tests {
);
}

#[tokio::test]
async fn inbound_close_channel() {
let (mut stream_handler, mut network_sender, mut inbound_channel_receiver, metadata, _, _) =
setup_test();

let stream_id = 127;
// Send two messages, no Fin.
for i in 0..2 {
let message = make_test_message(stream_id, i, false);
send(&mut network_sender, &metadata, message).await;
}

// Allow the StreamHandler to process the messages.
let join_handle = tokio::spawn(async move {
let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await;
stream_handler
});
let mut stream_handler = join_handle.await.expect("Task should succeed");

let mut receiver = inbound_channel_receiver.next().await.unwrap();
for _ in 0..2 {
let _ = receiver.next().await.unwrap();
}

// Check that the stream handler contains the StreamData.
assert_eq!(stream_handler.inbound_stream_data.len(), 1);
assert_eq!(
stream_handler.inbound_stream_data.keys().next().unwrap(),
&(metadata.originator_id.clone(), stream_id)
);

// Close the channel.
drop(receiver);

// Send more messages.
// TODO(guyn): if we set this to 2..4 it fails... the last message opens a new StreamData!
for i in 2..3 {
let message = make_test_message(stream_id, i, false);
send(&mut network_sender, &metadata, message).await;
}

// Allow the StreamHandler to process the messages.
let join_handle = tokio::spawn(async move {
let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await;
stream_handler
});
let stream_handler = join_handle.await.expect("Task should succeed");

// Check that the stream handler no longer contains the StreamData.
assert_eq!(stream_handler.inbound_stream_data.len(), 0);
}

// This test does two things:
// 1. Opens two outbound channels and checks that messages get correctly sent on both.
// 2. Closes the first channel and checks that Fin is sent and that the relevant structures
Expand Down

0 comments on commit 691a239

Please sign in to comment.