Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: correctly handle peer updates while waiting to reconnect #4052

Merged
merged 1 commit into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 99 additions & 44 deletions engine/src/p2p/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ impl ReconnectContext {
});
}

// NOTE: we might already have a reconnection scheduled for
// this (e.g. if we reset/cancel reconnection due to receiving
// new peer info), but instead of trying to cancel it now, we
// rely on the fact that we ignore any reconnection attempts
// for peers not in `ReconnectionScheduled` state.
fn reset(&mut self, account_id: &AccountId) {
if self.reconnect_delays.remove(account_id).is_some() {
tracing::debug!("Reconnection delay for {} is reset", account_id);
Expand All @@ -173,28 +178,36 @@ impl ReconnectContext {
}
}

enum ConnectionState {
// There is a ZMQ socket for this peer (which might or might
// not be connected, but reconnection is handled by ZMQ).
Connected(ConnectedOutgoingSocket),
// There is no ZMQ socket for this peer (because we don't
// want ZMQ's default behavior yet), but we have arranged
// for a ZMQ socket to be created again in the future.
ReconnectionScheduled,
}

struct ActiveConnectionWrapper {
metric: &'static P2P_ACTIVE_CONNECTIONS,
map: BTreeMap<AccountId, ConnectedOutgoingSocket>,
/// NOTE: The mapping is from AccountId because we want to optimise for message
/// sending, which uses AccountId
map: BTreeMap<AccountId, ConnectionState>,
}

impl ActiveConnectionWrapper {
fn new() -> ActiveConnectionWrapper {
ActiveConnectionWrapper { metric: &P2P_ACTIVE_CONNECTIONS, map: Default::default() }
}
fn get(&self, account_id: &AccountId) -> Option<&ConnectedOutgoingSocket> {
fn get(&self, account_id: &AccountId) -> Option<&ConnectionState> {
self.map.get(account_id)
}
fn insert(
&mut self,
key: AccountId,
value: ConnectedOutgoingSocket,
) -> Option<ConnectedOutgoingSocket> {
fn insert(&mut self, key: AccountId, value: ConnectionState) -> Option<ConnectionState> {
let result = self.map.insert(key, value);
self.metric.set(self.map.len());
result
}
fn remove(&mut self, key: &AccountId) -> Option<ConnectedOutgoingSocket> {
fn remove(&mut self, key: &AccountId) -> Option<ConnectionState> {
let result = self.map.remove(key);
self.metric.set(self.map.len());
result
Expand All @@ -208,13 +221,8 @@ struct P2PContext {
/// A handle to the authenticator thread that can be used to make changes to the
/// list of allowed peers
authenticator: Arc<Authenticator>,
/// Contains all existing ZMQ sockets for "client" connections. Note that ZMQ socket
/// exists even when there is no internal TCP connection (e.g. before the connection
/// is established for the first time, or when ZMQ it is reconnecting). Also, when
/// our own (independent from ZMQ) reconnection mechanism kicks in, the entry is removed
/// (because we don't want ZMQ's socket behaviour).
/// NOTE: The mapping is from AccountId because we want to optimise for message
/// sending, which uses AccountId
/// Contain entries for all nodes that we *should* be connected to (i.e. all registered
/// nodes), which are either connected or scheduled for reconnection
active_connections: ActiveConnectionWrapper,
/// NOTE: this is used for incoming messages when we want to map them to account_id
/// NOTE: we don't use BTreeMap here because XPublicKey doesn't implement Ord.
Expand Down Expand Up @@ -307,6 +315,10 @@ pub(super) fn start(
(out_msg_sender, peer_update_sender, incoming_message_receiver, own_peer_info_receiver, fut)
}

fn disconnect_socket(_socket: ConnectedOutgoingSocket) {
// Simply dropping the socket is enough
}

impl P2PContext {
async fn control_loop(
mut self,
Expand Down Expand Up @@ -358,10 +370,14 @@ impl P2PContext {

fn send_message(&self, account_id: AccountId, payload: Vec<u8>) {
match self.active_connections.get(&account_id) {
Some(socket) => {
Some(ConnectionState::Connected(socket)) => {
socket.send(payload);
P2P_MSG_SENT.inc();
},
Some(ConnectionState::ReconnectionScheduled) => {
// TODO: buffer the messages and send them later?
warn!("Failed to send message. Peer is scheduled for reconnection: {account_id}");
},
None => {
warn!("Failed to send message. Peer not registered: {account_id}")
},
Expand All @@ -371,7 +387,8 @@ impl P2PContext {
fn on_peer_update(&mut self, update: PeerUpdate) {
match update {
PeerUpdate::Registered(peer_info) => self.handle_peer_update(peer_info),
PeerUpdate::Deregistered(account_id, _pubkey) => self.remove_peer(account_id),
PeerUpdate::Deregistered(account_id, _pubkey) =>
self.handle_peer_deregistration(account_id),
}
}

Expand All @@ -385,32 +402,40 @@ impl P2PContext {
}
}

fn remove_peer_and_disconnect_socket(&mut self, socket: ConnectedOutgoingSocket) {
let pubkey = &socket.peer().pubkey;
fn clean_up_for_peer_pubkey(&mut self, pubkey: &XPublicKey) {
self.authenticator.remove_peer(pubkey);
assert!(
self.x25519_to_account_id.remove(pubkey).is_some(),
"Invariant violation: pubkey must be present"
);
if self.x25519_to_account_id.remove(pubkey).is_none() {
error!("Invariant violation: pubkey must be present");
debug_assert!(false, "Invariant violation: pubkey must be present");
}
}

/// Removing a peer means: (1) removing it from the list of allowed nodes,
/// (2) disconnecting our "client" socket with that node, (3) removing
/// any references to it in local state (mappings)
fn remove_peer(&mut self, account_id: AccountId) {
fn handle_peer_deregistration(&mut self, account_id: AccountId) {
// NOTE: There is no (trivial) way to disconnect peers that are
// already connected to our listening ZMQ socket, we can only
// prevent future connections from being established and rely
// on peer from disconnecting from "client side".
// TODO: ensure that stale/inactive connections are terminated

if let Some(existing_socket) = self.active_connections.remove(&account_id) {
self.remove_peer_and_disconnect_socket(existing_socket);
match existing_socket {
ConnectionState::Connected(existing_socket) => {
disconnect_socket(existing_socket);
},
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&account_id);
},
}
} else {
error!("Failed remove unknown peer: {account_id}");
}

if self.peer_infos.remove(&account_id).is_none() {
if let Some(existing_info) = self.peer_infos.remove(&account_id) {
self.clean_up_for_peer_pubkey(&existing_info.pubkey);
} else {
error!("Failed to remove peer info for unknown peer: {account_id}");
}

Expand All @@ -423,14 +448,16 @@ impl P2PContext {
fn handle_monitor_event(&mut self, event: MonitorEvent) {
match event {
MonitorEvent::ConnectionFailure(account_id) => {
let Some(_existing_socket) = self.active_connections.remove(&account_id) else {
self.reconnect_context.schedule_reconnect(account_id.clone());
if self
.active_connections
.insert(account_id.clone(), ConnectionState::ReconnectionScheduled)
.is_none()
{
// NOTE: this should not happen, but this guards against any surprising ZMQ
// behaviour
error!("Unexpected attempt to reconnect to an existing peer: {account_id}");
return
error!("Unexpected attempt to reconnect to an unknown peer: {account_id}");
};

self.reconnect_context.schedule_reconnect(account_id);
},
MonitorEvent::ConnectionSuccess(account_id) => {
self.reconnect_context.reset(&account_id);
Expand All @@ -440,16 +467,28 @@ impl P2PContext {

fn reconnect_to_peer(&mut self, account_id: &AccountId) {
if let Some(peer_info) = self.peer_infos.get(account_id) {
info!("Reconnecting to peer: {}", peer_info.account_id);

// It is possible that while we were waiting to reconnect,
// we received a peer info update and created a new "connection".
// This connection might be "healthy", but it is safer/easier to
// remove it and proceed with reconnecting.
if self.active_connections.remove(account_id).is_some() {
debug!("Reconnecting to a peer that's already connected: {}. Existing connection was removed.", account_id);
match self.active_connections.remove(account_id) {
Some(ConnectionState::ReconnectionScheduled) => {
info!("Reconnecting to peer: {}", account_id);
self.connect_to_peer(peer_info.clone());
},
Some(ConnectionState::Connected(_)) => {
// It is possible that while we were waiting to reconnect,
// we received a peer info update and created a new "connection".
// It is safe to drop the reconnection attempt even if this
// ZMQ connection is not "healthy" since reconnecting
// is now in ZMQ's hands, and it shouldn't be possible that we
// have missed any new `ConnectionFailure` event since we wouldn't
// be in `Connected` state now.
debug!(
"Reconnection attempt to {} cancelled: ZMQ socket already exists.",
account_id
);
},
None => {
debug!("Will not reconnect to now deregistered peer: {}", account_id);
},
}
self.connect_to_peer(peer_info.clone());
} else {
error!("Failed to reconnect to peer {account_id}. (Peer info not found.)");
}
Expand All @@ -464,12 +503,16 @@ impl P2PContext {

let connected_socket = socket.connect(peer);

if let Some(old_socket) = self.active_connections.insert(account_id, connected_socket) {
if self
.active_connections
.insert(account_id.clone(), ConnectionState::Connected(connected_socket))
.is_some()
{
// This should not happen because we always remove existing connection/socket
// prior to connecting, but even if it does, it should be OK to replace the
// connection (this doesn't break any invariants and the new peer info is
// likely to be more up-to-date).
warn!("Replacing existing ZMQ socket: {:?}", old_socket.peer());
error!("Unexpected existing connection while connecting to {account_id}");
}
}

Expand All @@ -489,14 +532,21 @@ impl P2PContext {
}

fn add_or_update_peer(&mut self, peer: PeerInfo) {
if let Some(existing_socket) = self.active_connections.remove(&peer.account_id) {
if let Some(existing_state) = self.active_connections.remove(&peer.account_id) {
debug!(
peer_info = peer.to_string(),
"Received info for known peer with account id {}, updating info and reconnecting",
&peer.account_id
);

self.remove_peer_and_disconnect_socket(existing_socket);
match existing_state {
ConnectionState::Connected(socket) => {
disconnect_socket(socket);
},
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&peer.account_id);
},
}
} else {
debug!(
peer_info = peer.to_string(),
Expand All @@ -505,6 +555,11 @@ impl P2PContext {
);
}

// Remove any state from previous peer info in case of update:
if let Some(existing_info) = self.peer_infos.remove(&peer.account_id) {
self.clean_up_for_peer_pubkey(&existing_info.pubkey);
}

self.authenticator.add_peer(&peer);

self.peer_infos.insert(peer.account_id.clone(), peer.clone());
Expand Down
4 changes: 0 additions & 4 deletions engine/src/p2p/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,4 @@ impl ConnectedOutgoingSocket {
warn!("Failed to send a message to {}: {e}", self.peer.account_id,);
}
}

pub fn peer(&self) -> &PeerInfo {
&self.peer
}
}