-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(consensus): dont fail when stream handler inbound receiver drops #2730
base: main
Are you sure you want to change the base?
Conversation
e4a7977
to
75eaa43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 2 of 2 files at r1, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @guy-starkware)
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 412 at r1 (raw file):
// Allow the StreamHandler to process the messages. let join_handle = tokio::spawn(async move { let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await;
I think often it's better not to have timeouts like this in a test. Normally tests are run with a timeout on the whole test, and so this explicit timeout is just adding complexity to your test code.
Code quote:
let _ = tokio::time::timeout(TIMEOUT, stream_handler.run()).await;
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 433 at r1 (raw file):
// Send more messages. // TODO(guyn): if we set this to 2..4 it fails... the last message opens a new StreamData!
this is the follow up "ignore late messages"?
Code quote:
// TODO(guyn): if we set this to 2..4 it fails... the last message opens a new StreamData!
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 185 at r1 (raw file):
} fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) -> bool {
Suggestion:
// Returns false if the receiver is no longer subscribed to this stream.
fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) -> bool {
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 204 at r1 (raw file):
"Sender is full, dropping the message. StreamId: {}, MessageId: {}", message.stream_id, message.message_id );
Good catch. I would want this to just stay in the internal buffer for this stream_id (where we keep the out of order messages). I don't see this as an error. That buffer though must be limited in size to prevent OOM
Code quote:
} else if e.is_full() {
// TODO(guyn): replace panic with more graceful error handling
panic!(
"Sender is full, dropping the message. StreamId: {}, MessageId: {}",
message.stream_id, message.message_id
);
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 313 at r1 (raw file):
Ordering::Equal => { // success will be false if the receiver is closed. let mut success = Self::inbound_send(data, message);
WDYT?
Suggestion:
let mut receiver_unsubscribed = Self::inbound_send(data, message);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @matan-starkware)
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 204 at r1 (raw file):
Previously, matan-starkware wrote…
Good catch. I would want this to just stay in the internal buffer for this stream_id (where we keep the out of order messages). I don't see this as an error. That buffer though must be limited in size to prevent OOM
Good idea!
It looks like this may be a little more complicated because of the way the code is written right now. Could require a bit of a refactor to make this work.
Also, I'm not sure what happens after we buffer that message. If no other messages come in, that buffer will never get triggered and you may end up with all messages buffered forever.
I'm putting a TODO on this option and we'll do it on the next PR.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 313 at r1 (raw file):
Previously, matan-starkware wrote…
WDYT?
yeah, I was feeling a bit uneasy about "success". I'll take your idea. Maybe "receiver_dropped" is more obvious? Because as far as I can tell, the only way to close the channel from that end is to drop it.
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 412 at r1 (raw file):
Previously, matan-starkware wrote…
I think often it's better not to have timeouts like this in a test. Normally tests are run with a timeout on the whole test, and so this explicit timeout is just adding complexity to your test code.
I don't know how to make the stream handler's run()
function quit gracefully, so instead I'm using a timeout. This is not "in case things run too long" this is "after messages are consumed, quit the run".
crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
line 433 at r1 (raw file):
Previously, matan-starkware wrote…
this is the follow up "ignore late messages"?
Exactly.
crates/sequencing/papyrus_consensus/src/stream_handler.rs
line 185 at r1 (raw file):
} fn inbound_send(data: &mut StreamData<T>, message: StreamMessage<T>) -> bool {
Done.
75eaa43
to
691a239
Compare
In the StreamHandler inbound case (listening to the network and sending internal messages into Consensus) there's a case where the Consensus drops the receiver.
This is a receiver that the StreamHandler sends whenever it sees a new stream, and the receiver should stay open for as long as that stream is not done.
However, the Consensus may choose to drop it (e.g., when moving to a new height) and that leaves the StreamHandler with a broken pipe, and also some messages that should go into that pipe.
As a first step, I'm making this no longer a panic, but if the receiver is dropped, then the sender inside StreamHandler is dropped, too (along with the remaining StreamData, which includes the message id, cached messages, etc).
This still doesn't address what happens to all the other messages that should come into StreamHandler. I'll handle that in the next PR.