Skip to content

Commit

Permalink
Improve rpc logic (#6400)
Browse files Browse the repository at this point in the history
* update rpc imports to be explicit

* avoid exposing HandlerEvent outside RPC

it's unnecessary.

* handle Pongs at RPC handler level
  • Loading branch information
jxs authored Sep 17, 2024
1 parent e0ccadb commit 2f6ad34
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 51 deletions.
24 changes: 22 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::methods::{GoodbyeReason, RPCCodedResponse, RPCResponseErrorCode};
use super::outbound::OutboundRequestContainer;
use super::protocol::{InboundOutput, InboundRequest, Protocol, RPCError, RPCProtocol};
use super::{RPCReceived, RPCSend, ReqId};
use super::{RPCReceived, RPCResponse, RPCSend, ReqId};
use crate::rpc::outbound::{OutboundFramed, OutboundRequest};
use crate::rpc::protocol::InboundFramed;
use fnv::FnvHashMap;
Expand All @@ -14,7 +14,8 @@ use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
};
use libp2p::swarm::Stream;
use libp2p::swarm::{ConnectionId, Stream};
use libp2p::PeerId;
use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
Expand Down Expand Up @@ -88,6 +89,12 @@ pub struct RPCHandler<Id, E>
where
E: EthSpec,
{
/// This `ConnectionId`.
id: ConnectionId,

/// The matching `PeerId` of this connection.
peer_id: PeerId,

/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,

Expand Down Expand Up @@ -218,12 +225,16 @@ where
E: EthSpec,
{
pub fn new(
id: ConnectionId,
peer_id: PeerId,
listen_protocol: SubstreamProtocol<RPCProtocol<E>, ()>,
fork_context: Arc<ForkContext>,
log: &slog::Logger,
resp_timeout: Duration,
) -> Self {
RPCHandler {
id,
peer_id,
listen_protocol,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
Expand Down Expand Up @@ -892,6 +903,15 @@ where
self.shutdown(None);
}

// If we received a Ping, we queue a Pong response.
if let InboundRequest::Ping(ping) = req {
trace!(self.log, "Received Ping, queueing Pong";"connection_id" => %self.id, "peer_id" => %self.peer_id);
self.send_response(
self.current_inbound_substream_id,
RPCCodedResponse::Success(RPCResponse::Pong(ping)),
);
}

self.events_out.push(HandlerEvent::Ok(RPCReceived::Request(
self.current_inbound_substream_id,
req,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub struct StatusMessage {
}

/// The PING request/response message.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
#[derive(Encode, Decode, Copy, Clone, Debug, PartialEq)]
pub struct Ping {
/// The metadata sequence number.
pub data: u64,
Expand Down
52 changes: 30 additions & 22 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub struct RPCMessage<Id, E: EthSpec> {
/// Handler managing this message.
pub conn_id: ConnectionId,
/// The message that was sent.
pub event: HandlerEvent<Id, E>,
pub message: Result<RPCReceived<Id, E>, HandlerErr<Id>>,
}

type BehaviourAction<Id, E> = ToSwarm<RPCMessage<Id, E>, RPCSend<Id, E>>;
Expand Down Expand Up @@ -245,6 +245,8 @@ where
.log
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));
let handler = RPCHandler::new(
connection_id,
peer_id,
protocol,
self.fork_context.clone(),
&log,
Expand Down Expand Up @@ -278,6 +280,8 @@ where
.new(slog::o!("peer_id" => peer_id.to_string(), "connection_id" => connection_id.to_string()));

let handler = RPCHandler::new(
connection_id,
peer_id,
protocol,
self.fork_context.clone(),
&log,
Expand Down Expand Up @@ -311,7 +315,7 @@ where
let error_msg = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id: connection_id,
event: HandlerEvent::Err(HandlerErr::Outbound {
message: Err(HandlerErr::Outbound {
id,
proto,
error: RPCError::Disconnected,
Expand All @@ -332,7 +336,7 @@ where
*event = ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id: connection_id,
event: HandlerEvent::Err(HandlerErr::Outbound {
message: Err(HandlerErr::Outbound {
id: *request_id,
proto: req.versioned_protocol().protocol(),
error: RPCError::Disconnected,
Expand All @@ -351,16 +355,16 @@ where
event: <Self::ConnectionHandler as ConnectionHandler>::ToBehaviour,
) {
match event {
HandlerEvent::Ok(RPCReceived::Request(ref id, ref req)) => {
HandlerEvent::Ok(RPCReceived::Request(id, req)) => {
if let Some(limiter) = self.limiter.as_mut() {
// check if the request is conformant to the quota
match limiter.allows(&peer_id, req) {
match limiter.allows(&peer_id, &req) {
Ok(()) => {
// send the event to the user
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
message: Ok(RPCReceived::Request(id, req)),
}))
}
Err(RateLimitedErr::TooLarge) => {
Expand All @@ -384,7 +388,7 @@ where
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
(conn_id, id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
"Rate limited. Request too large".into(),
Expand All @@ -398,7 +402,7 @@ where
// the handler upon receiving the error code will send it back to the behaviour
self.send_response(
peer_id,
(conn_id, *id),
(conn_id, id),
RPCCodedResponse::Error(
RPCResponseErrorCode::RateLimited,
format!("Wait {:?}", wait_time).into(),
Expand All @@ -411,24 +415,31 @@ where
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
message: Ok(RPCReceived::Request(id, req)),
}))
}
}
HandlerEvent::Close(_) => {
// Handle the close event here.
self.events.push(ToSwarm::CloseConnection {
HandlerEvent::Ok(rpc) => {
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
connection: CloseConnection::All,
});
conn_id,
message: Ok(rpc),
}));
}
_ => {
HandlerEvent::Err(err) => {
self.events.push(ToSwarm::GenerateEvent(RPCMessage {
peer_id,
conn_id,
event,
message: Err(err),
}));
}
HandlerEvent::Close(_) => {
// Handle the close event here.
self.events.push(ToSwarm::CloseConnection {
peer_id,
connection: CloseConnection::All,
});
}
}
}

Expand Down Expand Up @@ -463,8 +474,8 @@ where
serializer: &mut dyn slog::Serializer,
) -> slog::Result {
serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?;
match &self.event {
HandlerEvent::Ok(received) => {
match &self.message {
Ok(received) => {
let (msg_kind, protocol) = match received {
RPCReceived::Request(_, req) => {
("request", req.versioned_protocol().protocol())
Expand All @@ -485,17 +496,14 @@ where
serializer.emit_str("msg_kind", msg_kind)?;
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
}
HandlerEvent::Err(error) => {
Err(error) => {
let (msg_kind, protocol) = match &error {
HandlerErr::Inbound { proto, .. } => ("inbound_err", *proto),
HandlerErr::Outbound { proto, .. } => ("outbound_err", *proto),
};
serializer.emit_str("msg_kind", msg_kind)?;
serializer.emit_arguments("protocol", &format_args!("{}", protocol))?;
}
HandlerEvent::Close(err) => {
serializer.emit_arguments("handler_close", &format_args!("{}", err))?;
}
};

slog::Result::Ok(())
Expand Down
37 changes: 11 additions & 26 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use crate::peer_manager::{
};
use crate::peer_manager::{MIN_OUTBOUND_ONLY_FACTOR, PEER_EXCESS_FACTOR, PRIORITY_PEER_EXCESS};
use crate::rpc::methods::MetadataRequest;
use crate::rpc::*;
use crate::rpc::{
methods, BlocksByRangeRequest, GoodbyeReason, HandlerErr, InboundRequest, NetworkParams,
OutboundRequest, Protocol, RPCCodedResponse, RPCError, RPCMessage, RPCReceived, RPCResponse,
RPCResponseErrorCode, ResponseTermination, RPC,
};
use crate::service::behaviour::BehaviourEvent;
pub use crate::service::behaviour::Gossipsub;
use crate::types::{
Expand Down Expand Up @@ -1128,16 +1132,6 @@ impl<E: EthSpec> Network<E> {
.send_request(peer_id, id, OutboundRequest::Ping(ping));
}

/// Sends a Pong response to the peer.
fn pong(&mut self, id: PeerRequestId, peer_id: PeerId) {
let ping = crate::rpc::Ping {
data: *self.network_globals.local_metadata.read().seq_number(),
};
trace!(self.log, "Sending Pong"; "request_id" => id.1, "peer_id" => %peer_id);
let event = RPCCodedResponse::Success(RPCResponse::Pong(ping));
self.eth2_rpc_mut().send_response(peer_id, id, event);
}

/// Sends a METADATA request to a peer.
fn send_meta_data_request(&mut self, peer_id: PeerId) {
let event = if self.fork_context.spec.is_peer_das_scheduled() {
Expand Down Expand Up @@ -1406,10 +1400,7 @@ impl<E: EthSpec> Network<E> {
let peer_id = event.peer_id;

// Do not permit Inbound events from peers that are being disconnected, or RPC requests.
if !self.peer_manager().is_connected(&peer_id)
&& (matches!(event.event, HandlerEvent::Err(HandlerErr::Inbound { .. }))
|| matches!(event.event, HandlerEvent::Ok(RPCReceived::Request(..))))
{
if !self.peer_manager().is_connected(&peer_id) {
debug!(
self.log,
"Ignoring rpc message of disconnecting peer";
Expand All @@ -1420,8 +1411,8 @@ impl<E: EthSpec> Network<E> {

let handler_id = event.conn_id;
// The METADATA and PING RPC responses are handled within the behaviour and not propagated
match event.event {
HandlerEvent::Err(handler_err) => {
match event.message {
Err(handler_err) => {
match handler_err {
HandlerErr::Inbound {
id: _,
Expand Down Expand Up @@ -1456,15 +1447,13 @@ impl<E: EthSpec> Network<E> {
}
}
}
HandlerEvent::Ok(RPCReceived::Request(id, request)) => {
Ok(RPCReceived::Request(id, request)) => {
let peer_request_id = (handler_id, id);
match request {
/* Behaviour managed protocols: Ping and Metadata */
InboundRequest::Ping(ping) => {
// inform the peer manager and send the response
self.peer_manager_mut().ping_request(&peer_id, ping.data);
// send a ping response
self.pong(peer_request_id, peer_id);
None
}
InboundRequest::MetaData(req) => {
Expand Down Expand Up @@ -1587,7 +1576,7 @@ impl<E: EthSpec> Network<E> {
}
}
}
HandlerEvent::Ok(RPCReceived::Response(id, resp)) => {
Ok(RPCReceived::Response(id, resp)) => {
match resp {
/* Behaviour managed protocols */
RPCResponse::Pong(ping) => {
Expand Down Expand Up @@ -1640,7 +1629,7 @@ impl<E: EthSpec> Network<E> {
),
}
}
HandlerEvent::Ok(RPCReceived::EndOfStream(id, termination)) => {
Ok(RPCReceived::EndOfStream(id, termination)) => {
let response = match termination {
ResponseTermination::BlocksByRange => Response::BlocksByRange(None),
ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None),
Expand All @@ -1651,10 +1640,6 @@ impl<E: EthSpec> Network<E> {
};
self.build_response(id, peer_id, response)
}
HandlerEvent::Close(_) => {
// NOTE: This is handled in the RPC behaviour.
None
}
}
}

Expand Down

0 comments on commit 2f6ad34

Please sign in to comment.