Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: p2p stale connections #4189

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions engine/src/p2p/core.rs
j4m1ef0rd marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod socket;
mod tests;

use std::{
cell::Cell,
collections::{BTreeMap, HashMap},
net::Ipv6Addr,
sync::Arc,
Expand All @@ -17,6 +18,7 @@ use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utilities::{
make_periodic_tick,
metrics::{
P2P_ACTIVE_CONNECTIONS, P2P_BAD_MSG, P2P_MSG_RECEIVED, P2P_MSG_SENT, P2P_RECONNECT_PEERS,
},
Expand All @@ -36,6 +38,12 @@ use super::{EdPublicKey, P2PKey, XPublicKey};
/// this somewhat short to mitigate some attacks where clients
/// can use system resources without authenticating.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3);
/// How long to wait until some activity on a socket (defined by a need to
/// send a message) before deeming the connection "stale" (the state in which
/// we drop the socket and are not actively trying to reconnect)
pub const MAX_INACTIVITY_THRESHOLD: Duration = Duration::from_secs(60 * 60);
/// How often to check for "stale" connections
pub const ACTIVITY_CHECK_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Clone)]
pub struct X25519KeyPair {
Expand Down Expand Up @@ -175,10 +183,17 @@ enum ConnectionState {
// want ZMQ's default behavior yet), but we have arranged
// for a ZMQ socket to be created again in the future.
ReconnectionScheduled,
// There hasn't been recent interaction with the node, so we
// don't maintain an active connection with it. We will connect
// to it lazily if needed.
Stale,
}

struct ConnectionStateInfo {
state: ConnectionState,
// Last time we received an instruction to send a message
kylezs marked this conversation as resolved.
Show resolved Hide resolved
// to this node
last_activity: Cell<tokio::time::Instant>,
info: PeerInfo,
}

Expand Down Expand Up @@ -252,10 +267,6 @@ pub(super) async fn start(

zmq_context.set_max_sockets(65536).expect("should update socket limit");

// TODO: consider keeping track of "last activity" on any outgoing
// socket connection and disconnecting inactive peers (see proxy_expire_idle_peers
// in OxenMQ)

let authenticator = auth::start_authentication_thread(zmq_context.clone());

let (reconnect_sender, reconnect_receiver) = tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -307,6 +318,8 @@ impl P2PContext {
mut monitor_event_receiver: UnboundedReceiver<MonitorEvent>,
mut reconnect_receiver: UnboundedReceiver<AccountId>,
) {
let mut check_activity_interval = make_periodic_tick(ACTIVITY_CHECK_INTERVAL, false);

loop {
tokio::select! {
Some(messages) = outgoing_message_receiver.recv() => {
Expand All @@ -326,11 +339,14 @@ impl P2PContext {
Some(account_id) = reconnect_receiver.recv() => {
self.reconnect_to_peer(&account_id);
}
_ = check_activity_interval.tick() => {
self.check_activity();
}
}
}
}

fn send_messages(&self, messages: OutgoingMultisigStageMessages) {
fn send_messages(&mut self, messages: OutgoingMultisigStageMessages) {
match messages {
OutgoingMultisigStageMessages::Broadcast(account_ids, payload) => {
trace!("Broadcasting a message to all {} peers", account_ids.len());
Expand All @@ -347,8 +363,10 @@ impl P2PContext {
}
}

fn send_message(&self, account_id: AccountId, payload: Vec<u8>) {
fn send_message(&mut self, account_id: AccountId, payload: Vec<u8>) {
if let Some(peer) = self.active_connections.get(&account_id) {
peer.last_activity.set(tokio::time::Instant::now());

match &peer.state {
ConnectionState::Connected(socket) => {
socket.send(payload);
Expand All @@ -360,6 +378,12 @@ impl P2PContext {
"Failed to send message. Peer is scheduled for reconnection: {account_id}"
);
},
ConnectionState::Stale => {
// Connect and try again (there is no infinite loop here
// since the state will be `Connected` after this)
self.connect_to_peer(peer.info.clone());
kylezs marked this conversation as resolved.
Show resolved Hide resolved
self.send_message(account_id, payload);
j4m1ef0rd marked this conversation as resolved.
Show resolved Hide resolved
},
}
} else {
warn!("Failed to send message. Peer not registered: {account_id}")
Expand Down Expand Up @@ -415,6 +439,9 @@ impl P2PContext {
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&account_id);
},
ConnectionState::Stale => {
// Nothing to do
},
}

self.clean_up_for_peer_pubkey(&peer.info.pubkey);
Expand Down Expand Up @@ -448,7 +475,7 @@ impl P2PContext {
if let Some(peer) = self.active_connections.remove(account_id) {
match peer.state {
ConnectionState::ReconnectionScheduled => {
info!("Reconnecting to peer: {}", account_id);
info!("Reconnecting to peer: {account_id}");
self.connect_to_peer(peer.info.clone());
},
ConnectionState::Connected(_) => {
Expand All @@ -464,6 +491,12 @@ impl P2PContext {
account_id
);
},
ConnectionState::Stale => {
debug!(
"Reconnection attempt to {} cancelled: connection is stale.",
account_id
);
},
}
} else {
debug!("Will not reconnect to now deregistered peer: {}", account_id);
Expand All @@ -486,6 +519,7 @@ impl P2PContext {
ConnectionStateInfo {
state: ConnectionState::Connected(connected_socket),
info: peer,
last_activity: Cell::new(tokio::time::Instant::now()),
},
)
.is_some()
j4m1ef0rd marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -518,6 +552,9 @@ impl P2PContext {
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&peer.account_id);
},
ConnectionState::Stale => {
// nothing to do
},
}
// Remove any state from previous peer info in case of update:
self.clean_up_for_peer_pubkey(&existing_peer_state.info.pubkey);
Expand Down Expand Up @@ -588,6 +625,19 @@ impl P2PContext {

incoming_message_receiver
}

fn check_activity(&mut self) {
for (account_id, state) in &mut self.active_connections.map {
if !matches!(state.state, ConnectionState::Stale) &&
state.last_activity.get().elapsed() > MAX_INACTIVITY_THRESHOLD
{
debug!("Peer connection is deemed stale due to inactivity: {}", account_id);
self.reconnect_context.reset(account_id);
// ZMQ socket is dropped here
state.state = ConnectionState::Stale;
}
}
}
}

/// Unlike recv_multipart available on zmq::Socket, this collects
Expand Down
6 changes: 0 additions & 6 deletions engine/src/p2p/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ impl OutgoingSocket {
// Discard any pending messages when disconnecting a socket
socket.set_linger(DO_NOT_LINGER).unwrap();

// Do not buffer outgoing messages before a connection is
// established (this means the messages will be lost, but
// this prevents us from buffering messages to peers that
// are misconfigured, for example).
socket.set_immediate(true).unwrap();

// Buffer at most OUTGOING_MESSAGES_BUFFER_SIZE messages
// per peer (this minimises how much memory we might "leak"
// if they never come online again).
Expand Down
30 changes: 29 additions & 1 deletion engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::{PeerInfo, PeerUpdate};
use crate::p2p::{OutgoingMultisigStageMessages, P2PKey};
use crate::p2p::{
core::{ACTIVITY_CHECK_INTERVAL, MAX_INACTIVITY_THRESHOLD},
OutgoingMultisigStageMessages, P2PKey,
};
use sp_core::ed25519::Public;
use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -223,3 +226,28 @@ async fn connects_after_registration() {
// It should now be able to communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_some());
}

#[tokio::test(start_paused = true)]
async fn stale_connections() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8094);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8095);

let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Sleep long enough for nodes to deem connections "stale" (due to inactivity)
tokio::time::sleep(
ACTIVITY_CHECK_INTERVAL + MAX_INACTIVITY_THRESHOLD + std::time::Duration::from_secs(1),
)
.await;

// Resuming is necessary for timeouts to work correctly
tokio::time::resume();

// Ensure that we can re-activate stale connections when needed
send_and_receive_message(&node1, &mut node2).await.unwrap();
send_and_receive_message(&node2, &mut node1).await.unwrap();
}
Loading