Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report RPC Errors to the application on peer disconnections #5658

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ pub enum OutboundSubstreamState<E: EthSpec> {
/// Keeps track of the actual request sent.
request: OutboundRequest<E>,
},
/// Closing an outbound substream>
/// Closing an outbound substream.
Closing(Box<OutboundFramed<Stream, E>>),
/// Temporary state during processing
/// Temporary state during processing.
Poisoned,
}

Expand Down Expand Up @@ -352,6 +352,31 @@ where
!matches!(self.state, HandlerState::Deactivated)
}

// NOTE: This function gets polled to completion upon a connection close.
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
// Inform the network behaviour of any failed requests

while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
let outbound_info = self
.outbound_substreams
.remove(&substream_id)
.expect("The value must exist for a key");
// If the state of the connection is closing, we do not need to report this case to
// the behaviour, as the connection has just closed non-gracefully
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
continue;
}

// Register this request as an RPC Error
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: outbound_info.proto,
id: outbound_info.req_id,
})));
}
Poll::Ready(None)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,12 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
.goodbye_peer(peer_id, reason, source);
}

/// Hard (ungraceful) disconnect for testing purposes only
/// Use goodbye_peer for disconnections, do not use this function.
pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) {
let _ = self.swarm.disconnect_peer_id(peer_id);
}

/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&self) -> Vec<Enr> {
self.discovery().table_entries_enr()
Expand Down
107 changes: 107 additions & 0 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,113 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
})
}

#[test]
fn test_disconnect_triggers_rpc_error() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = true;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();

let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
vec![
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
Hash256::from_low_u64_be(0),
],
&spec,
));

// build the sender future
let sender_future = async {
loop {
let ev = sender.next_event().await;
warn!(log, "EVENT {:?}", ev);
match ev {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 10, rpc_request.clone());
}
NetworkEvent::ResponseReceived {
peer_id: _, id: 10, ..
} => {
debug!(log, "Sender received a response");
}
NetworkEvent::RPCFailed { id, peer_id, error } => {
panic!("got RPCError");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should return true from the future if we entered this branch and only pass the test in that case.

}
_ => {} // Ignore other behaviour events
}
}
};

// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut message_info = None;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
)
.await
{
futures::future::Either::Left((
NetworkEvent::RequestReceived { peer_id, id, .. },
_,
)) => {
// send the response
warn!(log, "Receiver got request");
message_info = Some((peer_id, id));
}
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
_ => continue,
}

// if we need to send messages send them here. This will happen after a delay
if message_info.is_some() {
let (peer_id, _) = message_info.as_ref().unwrap();

receiver.__hard_disconnect_testing_only(*peer_id);
debug!(log, "Disconnecting peer");
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}

/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
/// Goodbye message.
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
Expand Down
Loading