Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: p2p stale connections #4189

Merged
merged 5 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 75 additions & 24 deletions engine/src/p2p/core.rs
j4m1ef0rd marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod socket;
mod tests;

use std::{
cell::Cell,
collections::{BTreeMap, HashMap},
net::Ipv6Addr,
sync::Arc,
Expand All @@ -17,6 +18,7 @@ use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utilities::{
make_periodic_tick,
metrics::{
P2P_ACTIVE_CONNECTIONS, P2P_BAD_MSG, P2P_MSG_RECEIVED, P2P_MSG_SENT, P2P_RECONNECT_PEERS,
},
Expand All @@ -36,6 +38,12 @@ use super::{EdPublicKey, P2PKey, XPublicKey};
/// this somewhat short to mitigate some attacks where clients
/// can use system resources without authenticating.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3);
/// How long to wait until some activity on a socket (defined by a need to
/// send a message) before deeming the connection "stale" (the state in which
/// we drop the socket and are not actively trying to reconnect)
pub const MAX_INACTIVITY_THRESHOLD: Duration = Duration::from_secs(60 * 60);
/// How often to check for "stale" connections
pub const ACTIVITY_CHECK_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Clone)]
pub struct X25519KeyPair {
Expand Down Expand Up @@ -175,10 +183,17 @@ enum ConnectionState {
// want ZMQ's default behavior yet), but we have arranged
// for a ZMQ socket to be created again in the future.
ReconnectionScheduled,
// There hasn't been recent interaction with the node, so we
// don't maintain an active connection with it. We will connect
// to it lazily if needed.
Stale,
}

struct ConnectionStateInfo {
state: ConnectionState,
// Last time we received an instruction to send a message
kylezs marked this conversation as resolved.
Show resolved Hide resolved
// to this node
last_activity: Cell<tokio::time::Instant>,
info: PeerInfo,
}

Expand Down Expand Up @@ -252,10 +267,6 @@ pub(super) async fn start(

zmq_context.set_max_sockets(65536).expect("should update socket limit");

// TODO: consider keeping track of "last activity" on any outgoing
// socket connection and disconnecting inactive peers (see proxy_expire_idle_peers
// in OxenMQ)

let authenticator = auth::start_authentication_thread(zmq_context.clone());

let (reconnect_sender, reconnect_receiver) = tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -307,6 +318,8 @@ impl P2PContext {
mut monitor_event_receiver: UnboundedReceiver<MonitorEvent>,
mut reconnect_receiver: UnboundedReceiver<AccountId>,
) {
let mut check_activity_interval = make_periodic_tick(ACTIVITY_CHECK_INTERVAL, false);

loop {
tokio::select! {
Some(messages) = outgoing_message_receiver.recv() => {
Expand All @@ -326,11 +339,14 @@ impl P2PContext {
Some(account_id) = reconnect_receiver.recv() => {
self.reconnect_to_peer(&account_id);
}
_ = check_activity_interval.tick() => {
self.check_activity();
}
}
}
}

fn send_messages(&self, messages: OutgoingMultisigStageMessages) {
fn send_messages(&mut self, messages: OutgoingMultisigStageMessages) {
match messages {
OutgoingMultisigStageMessages::Broadcast(account_ids, payload) => {
trace!("Broadcasting a message to all {} peers", account_ids.len());
Expand All @@ -347,8 +363,10 @@ impl P2PContext {
}
}

fn send_message(&self, account_id: AccountId, payload: Vec<u8>) {
fn send_message(&mut self, account_id: AccountId, payload: Vec<u8>) {
if let Some(peer) = self.active_connections.get(&account_id) {
peer.last_activity.set(tokio::time::Instant::now());

match &peer.state {
ConnectionState::Connected(socket) => {
socket.send(payload);
Expand All @@ -360,6 +378,16 @@ impl P2PContext {
"Failed to send message. Peer is scheduled for reconnection: {account_id}"
);
},
ConnectionState::Stale => {
// Connect and try again (there is no infinite loop here
// since the state will be `Connected` after this)

// This is guaranteed by construction of `active_connections`:
assert_eq!(peer.info.account_id, account_id);

self.connect_to_peer(peer.info.clone());
kylezs marked this conversation as resolved.
Show resolved Hide resolved
self.send_message(account_id, payload);
j4m1ef0rd marked this conversation as resolved.
Show resolved Hide resolved
},
}
} else {
warn!("Failed to send message. Peer not registered: {account_id}")
Expand Down Expand Up @@ -400,7 +428,6 @@ impl P2PContext {
// already connected to our listening ZMQ socket, we can only
// prevent future connections from being established and rely
// on peer from disconnecting from "client side".
// TODO: ensure that stale/inactive connections are terminated

if account_id == self.our_account_id {
warn!("Received peer info deregistration of our own node!");
Expand All @@ -415,6 +442,9 @@ impl P2PContext {
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&account_id);
},
ConnectionState::Stale => {
// Nothing to do
},
}

self.clean_up_for_peer_pubkey(&peer.info.pubkey);
Expand Down Expand Up @@ -448,7 +478,7 @@ impl P2PContext {
if let Some(peer) = self.active_connections.remove(account_id) {
match peer.state {
ConnectionState::ReconnectionScheduled => {
info!("Reconnecting to peer: {}", account_id);
info!("Reconnecting to peer: {account_id}");
self.connect_to_peer(peer.info.clone());
},
ConnectionState::Connected(_) => {
Expand All @@ -464,6 +494,12 @@ impl P2PContext {
account_id
);
},
ConnectionState::Stale => {
debug!(
"Reconnection attempt to {} cancelled: connection is stale.",
account_id
);
},
}
} else {
debug!("Will not reconnect to now deregistered peer: {}", account_id);
Expand All @@ -479,22 +515,21 @@ impl P2PContext {

let connected_socket = socket.connect(peer.clone());

if self
.active_connections
.insert(
account_id.clone(),
ConnectionStateInfo {
state: ConnectionState::Connected(connected_socket),
info: peer,
},
)
.is_some()
{
// This should not happen because we always remove existing connection/socket
// prior to connecting, but even if it does, it should be OK to replace the
// connection (this doesn't break any invariants and the new peer info is
// likely to be more up-to-date).
error!("Unexpected existing connection while connecting to {account_id}");
if let Some(connection) = self.active_connections.insert(
account_id.clone(),
ConnectionStateInfo {
state: ConnectionState::Connected(connected_socket),
info: peer,
last_activity: Cell::new(tokio::time::Instant::now()),
},
) {
if !matches!(connection.state, ConnectionState::Stale) {
// This should not happen for non-stale sockets because we always remove
// existing connection/socket prior to connecting, but even if it does,
// it should be OK to replace the connection (this doesn't break any
// invariants and the new peer info is likely to be more up-to-date).
error!("Unexpected existing connection while connecting to {account_id}");
}
}
}

Expand All @@ -518,6 +553,9 @@ impl P2PContext {
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&peer.account_id);
},
ConnectionState::Stale => {
// nothing to do
},
}
// Remove any state from previous peer info in case of update:
self.clean_up_for_peer_pubkey(&existing_peer_state.info.pubkey);
Expand Down Expand Up @@ -588,6 +626,19 @@ impl P2PContext {

incoming_message_receiver
}

fn check_activity(&mut self) {
for (account_id, state) in &mut self.active_connections.map {
if !matches!(state.state, ConnectionState::Stale) &&
state.last_activity.get().elapsed() > MAX_INACTIVITY_THRESHOLD
{
debug!("Peer connection is deemed stale due to inactivity: {}", account_id);
self.reconnect_context.reset(account_id);
// ZMQ socket is dropped here
state.state = ConnectionState::Stale;
}
}
}
}

/// Unlike recv_multipart available on zmq::Socket, this collects
Expand Down
8 changes: 1 addition & 7 deletions engine/src/p2p/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const CONNECTION_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DO_NOT_LINGER: i32 = 0;

/// How many messages to keep in a "resend" buffer per peer
const OUTGOING_MESSAGES_BUFFER_SIZE: i32 = 10;
const OUTGOING_MESSAGES_BUFFER_SIZE: i32 = 100;

/// Socket to be used for connecting to peer on the network
pub struct OutgoingSocket {
Expand All @@ -41,12 +41,6 @@ impl OutgoingSocket {
// Discard any pending messages when disconnecting a socket
socket.set_linger(DO_NOT_LINGER).unwrap();

// Do not buffer outgoing messages before a connection is
// established (this means the messages will be lost, but
// this prevents us from buffering messages to peers that
// are misconfigured, for example).
socket.set_immediate(true).unwrap();

// Buffer at most OUTGOING_MESSAGES_BUFFER_SIZE messages
// per peer (this minimises how much memory we might "leak"
// if they never come online again).
Expand Down
38 changes: 19 additions & 19 deletions engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::{PeerInfo, PeerUpdate};
use crate::p2p::{OutgoingMultisigStageMessages, P2PKey};
use crate::p2p::{
core::{ACTIVITY_CHECK_INTERVAL, MAX_INACTIVITY_THRESHOLD},
OutgoingMultisigStageMessages, P2PKey,
};
use sp_core::ed25519::Public;
use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -196,30 +199,27 @@ async fn can_connect_after_pubkey_change() {
send_and_receive_message(&node1, &mut node2b).await.unwrap();
}

/// Test the behaviour around receiving own registration: at first, if our node
/// is not registered, we delay connecting to other nodes; once we receive our
/// own registration, we connect to other registered nodes.
#[tokio::test]
async fn connects_after_registration() {
#[tokio::test(start_paused = true)]
async fn stale_connections() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8092);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8093);
let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8094);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8095);

// Node 1 doesn't get its own peer info at first and will wait for registration
let node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi2.clone()]);
let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// For sanity, check that node 1 can't yet communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_none());
// Sleep long enough for nodes to deem connections "stale" (due to inactivity)
tokio::time::sleep(
ACTIVITY_CHECK_INTERVAL + MAX_INACTIVITY_THRESHOLD + std::time::Duration::from_secs(1),
)
.await;

// Update node 1 with its own peer info
node1.peer_update_sender.send(PeerUpdate::Registered(pi1.clone())).unwrap();
// Resuming is necessary for timeouts to work correctly
tokio::time::resume();

// Allow some time for the above command to propagate through the channel
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// It should now be able to communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_some());
// Ensure that we can re-activate stale connections when needed
send_and_receive_message(&node1, &mut node2).await.unwrap();
send_and_receive_message(&node2, &mut node1).await.unwrap();
}