Skip to content

Commit

Permalink
Fix: correctly handle peer updates while waiting to reconnect (#4052)
Browse files Browse the repository at this point in the history
fix: better handling of p2p reconnection states
  • Loading branch information
msgmaxim authored Sep 28, 2023
1 parent f54fedf commit b540e42
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 48 deletions.
143 changes: 99 additions & 44 deletions engine/src/p2p/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ impl ReconnectContext {
});
}

// NOTE: we might already have a reconnection scheduled for
// this (e.g. if we reset/cancel reconnection due to receiving
// new peer info), but instead of trying to cancel it now, we
// rely on the fact that we ignore any reconnection attempts
// for peers not in `ReconnectionScheduled` state.
fn reset(&mut self, account_id: &AccountId) {
if self.reconnect_delays.remove(account_id).is_some() {
tracing::debug!("Reconnection delay for {} is reset", account_id);
Expand All @@ -173,28 +178,36 @@ impl ReconnectContext {
}
}

enum ConnectionState {
// There is a ZMQ socket for this peer (which might or might
// not be connected, but reconnection is handled by ZMQ).
Connected(ConnectedOutgoingSocket),
// There is no ZMQ socket for this peer (because we don't
// want ZMQ's default behavior yet), but we have arranged
// for a ZMQ socket to be created again in the future.
ReconnectionScheduled,
}

struct ActiveConnectionWrapper {
metric: &'static P2P_ACTIVE_CONNECTIONS,
map: BTreeMap<AccountId, ConnectedOutgoingSocket>,
/// NOTE: The mapping is from AccountId because we want to optimise for message
/// sending, which uses AccountId
map: BTreeMap<AccountId, ConnectionState>,
}

impl ActiveConnectionWrapper {
fn new() -> ActiveConnectionWrapper {
ActiveConnectionWrapper { metric: &P2P_ACTIVE_CONNECTIONS, map: Default::default() }
}
fn get(&self, account_id: &AccountId) -> Option<&ConnectedOutgoingSocket> {
fn get(&self, account_id: &AccountId) -> Option<&ConnectionState> {
self.map.get(account_id)
}
fn insert(
&mut self,
key: AccountId,
value: ConnectedOutgoingSocket,
) -> Option<ConnectedOutgoingSocket> {
fn insert(&mut self, key: AccountId, value: ConnectionState) -> Option<ConnectionState> {
let result = self.map.insert(key, value);
self.metric.set(self.map.len());
result
}
fn remove(&mut self, key: &AccountId) -> Option<ConnectedOutgoingSocket> {
fn remove(&mut self, key: &AccountId) -> Option<ConnectionState> {
let result = self.map.remove(key);
self.metric.set(self.map.len());
result
Expand All @@ -208,13 +221,8 @@ struct P2PContext {
/// A handle to the authenticator thread that can be used to make changes to the
/// list of allowed peers
authenticator: Arc<Authenticator>,
/// Contains all existing ZMQ sockets for "client" connections. Note that ZMQ socket
/// exists even when there is no internal TCP connection (e.g. before the connection
/// is established for the first time, or when ZMQ it is reconnecting). Also, when
/// our own (independent from ZMQ) reconnection mechanism kicks in, the entry is removed
/// (because we don't want ZMQ's socket behaviour).
/// NOTE: The mapping is from AccountId because we want to optimise for message
/// sending, which uses AccountId
/// Contain entries for all nodes that we *should* be connected to (i.e. all registered
/// nodes), which are either connected or scheduled for reconnection
active_connections: ActiveConnectionWrapper,
/// NOTE: this is used for incoming messages when we want to map them to account_id
/// NOTE: we don't use BTreeMap here because XPublicKey doesn't implement Ord.
Expand Down Expand Up @@ -307,6 +315,10 @@ pub(super) fn start(
(out_msg_sender, peer_update_sender, incoming_message_receiver, own_peer_info_receiver, fut)
}

fn disconnect_socket(_socket: ConnectedOutgoingSocket) {
// Simply dropping the socket is enough
}

impl P2PContext {
async fn control_loop(
mut self,
Expand Down Expand Up @@ -358,10 +370,14 @@ impl P2PContext {

fn send_message(&self, account_id: AccountId, payload: Vec<u8>) {
match self.active_connections.get(&account_id) {
Some(socket) => {
Some(ConnectionState::Connected(socket)) => {
socket.send(payload);
P2P_MSG_SENT.inc();
},
Some(ConnectionState::ReconnectionScheduled) => {
// TODO: buffer the messages and send them later?
warn!("Failed to send message. Peer is scheduled for reconnection: {account_id}");
},
None => {
warn!("Failed to send message. Peer not registered: {account_id}")
},
Expand All @@ -371,7 +387,8 @@ impl P2PContext {
fn on_peer_update(&mut self, update: PeerUpdate) {
match update {
PeerUpdate::Registered(peer_info) => self.handle_peer_update(peer_info),
PeerUpdate::Deregistered(account_id, _pubkey) => self.remove_peer(account_id),
PeerUpdate::Deregistered(account_id, _pubkey) =>
self.handle_peer_deregistration(account_id),
}
}

Expand All @@ -385,32 +402,40 @@ impl P2PContext {
}
}

fn remove_peer_and_disconnect_socket(&mut self, socket: ConnectedOutgoingSocket) {
let pubkey = &socket.peer().pubkey;
fn clean_up_for_peer_pubkey(&mut self, pubkey: &XPublicKey) {
self.authenticator.remove_peer(pubkey);
assert!(
self.x25519_to_account_id.remove(pubkey).is_some(),
"Invariant violation: pubkey must be present"
);
if self.x25519_to_account_id.remove(pubkey).is_none() {
error!("Invariant violation: pubkey must be present");
debug_assert!(false, "Invariant violation: pubkey must be present");
}
}

/// Removing a peer means: (1) removing it from the list of allowed nodes,
/// (2) disconnecting our "client" socket with that node, (3) removing
/// any references to it in local state (mappings)
fn remove_peer(&mut self, account_id: AccountId) {
fn handle_peer_deregistration(&mut self, account_id: AccountId) {
// NOTE: There is no (trivial) way to disconnect peers that are
// already connected to our listening ZMQ socket, we can only
// prevent future connections from being established and rely
// on peer from disconnecting from "client side".
// TODO: ensure that stale/inactive connections are terminated

if let Some(existing_socket) = self.active_connections.remove(&account_id) {
self.remove_peer_and_disconnect_socket(existing_socket);
match existing_socket {
ConnectionState::Connected(existing_socket) => {
disconnect_socket(existing_socket);
},
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&account_id);
},
}
} else {
error!("Failed remove unknown peer: {account_id}");
}

if self.peer_infos.remove(&account_id).is_none() {
if let Some(existing_info) = self.peer_infos.remove(&account_id) {
self.clean_up_for_peer_pubkey(&existing_info.pubkey);
} else {
error!("Failed to remove peer info for unknown peer: {account_id}");
}

Expand All @@ -423,14 +448,16 @@ impl P2PContext {
fn handle_monitor_event(&mut self, event: MonitorEvent) {
match event {
MonitorEvent::ConnectionFailure(account_id) => {
let Some(_existing_socket) = self.active_connections.remove(&account_id) else {
self.reconnect_context.schedule_reconnect(account_id.clone());
if self
.active_connections
.insert(account_id.clone(), ConnectionState::ReconnectionScheduled)
.is_none()
{
// NOTE: this should not happen, but this guards against any surprising ZMQ
// behaviour
error!("Unexpected attempt to reconnect to an existing peer: {account_id}");
return
error!("Unexpected attempt to reconnect to an unknown peer: {account_id}");
};

self.reconnect_context.schedule_reconnect(account_id);
},
MonitorEvent::ConnectionSuccess(account_id) => {
self.reconnect_context.reset(&account_id);
Expand All @@ -440,16 +467,28 @@ impl P2PContext {

fn reconnect_to_peer(&mut self, account_id: &AccountId) {
if let Some(peer_info) = self.peer_infos.get(account_id) {
info!("Reconnecting to peer: {}", peer_info.account_id);

// It is possible that while we were waiting to reconnect,
// we received a peer info update and created a new "connection".
// This connection might be "healthy", but it is safer/easier to
// remove it and proceed with reconnecting.
if self.active_connections.remove(account_id).is_some() {
debug!("Reconnecting to a peer that's already connected: {}. Existing connection was removed.", account_id);
match self.active_connections.remove(account_id) {
Some(ConnectionState::ReconnectionScheduled) => {
info!("Reconnecting to peer: {}", account_id);
self.connect_to_peer(peer_info.clone());
},
Some(ConnectionState::Connected(_)) => {
// It is possible that while we were waiting to reconnect,
// we received a peer info update and created a new "connection".
// It is safe to drop the reconnection attempt even if this
// ZMQ connection is not "healthy" since reconnecting
// is now in ZMQ's hands, and it shouldn't be possible that we
// have missed any new `ConnectionFailure` event since we wouldn't
// be in `Connected` state now.
debug!(
"Reconnection attempt to {} cancelled: ZMQ socket already exists.",
account_id
);
},
None => {
debug!("Will not reconnect to now deregistered peer: {}", account_id);
},
}
self.connect_to_peer(peer_info.clone());
} else {
error!("Failed to reconnect to peer {account_id}. (Peer info not found.)");
}
Expand All @@ -464,12 +503,16 @@ impl P2PContext {

let connected_socket = socket.connect(peer);

if let Some(old_socket) = self.active_connections.insert(account_id, connected_socket) {
if self
.active_connections
.insert(account_id.clone(), ConnectionState::Connected(connected_socket))
.is_some()
{
// This should not happen because we always remove existing connection/socket
// prior to connecting, but even if it does, it should be OK to replace the
// connection (this doesn't break any invariants and the new peer info is
// likely to be more up-to-date).
warn!("Replacing existing ZMQ socket: {:?}", old_socket.peer());
error!("Unexpected existing connection while connecting to {account_id}");
}
}

Expand All @@ -489,14 +532,21 @@ impl P2PContext {
}

fn add_or_update_peer(&mut self, peer: PeerInfo) {
if let Some(existing_socket) = self.active_connections.remove(&peer.account_id) {
if let Some(existing_state) = self.active_connections.remove(&peer.account_id) {
debug!(
peer_info = peer.to_string(),
"Received info for known peer with account id {}, updating info and reconnecting",
&peer.account_id
);

self.remove_peer_and_disconnect_socket(existing_socket);
match existing_state {
ConnectionState::Connected(socket) => {
disconnect_socket(socket);
},
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&peer.account_id);
},
}
} else {
debug!(
peer_info = peer.to_string(),
Expand All @@ -505,6 +555,11 @@ impl P2PContext {
);
}

// Remove any state from previous peer info in case of update:
if let Some(existing_info) = self.peer_infos.remove(&peer.account_id) {
self.clean_up_for_peer_pubkey(&existing_info.pubkey);
}

self.authenticator.add_peer(&peer);

self.peer_infos.insert(peer.account_id.clone(), peer.clone());
Expand Down
4 changes: 0 additions & 4 deletions engine/src/p2p/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,4 @@ impl ConnectedOutgoingSocket {
warn!("Failed to send a message to {}: {e}", self.peer.account_id,);
}
}

pub fn peer(&self) -> &PeerInfo {
&self.peer
}
}

0 comments on commit b540e42

Please sign in to comment.