diff --git a/engine/src/p2p/core.rs b/engine/src/p2p/core.rs index 3f6eb774fa..c8d2238dcd 100644 --- a/engine/src/p2p/core.rs +++ b/engine/src/p2p/core.rs @@ -165,6 +165,11 @@ impl ReconnectContext { }); } + // NOTE: we might already have a reconnection scheduled for + // this (e.g. if we reset/cancel reconnection due to receiving + // new peer info), but instead of trying to cancel it now, we + // rely on the fact that we ignore any reconnection attempts + // for peers not in `ReconnectionScheduled` state. fn reset(&mut self, account_id: &AccountId) { if self.reconnect_delays.remove(account_id).is_some() { tracing::debug!("Reconnection delay for {} is reset", account_id); @@ -173,28 +178,36 @@ impl ReconnectContext { } } +enum ConnectionState { + // There is a ZMQ socket for this peer (which might or might + // not be connected, but reconnection is handled by ZMQ). + Connected(ConnectedOutgoingSocket), + // There is no ZMQ socket for this peer (because we don't + // want ZMQ's default behavior yet), but we have arranged + // for a ZMQ socket to be created again in the future. + ReconnectionScheduled, +} + struct ActiveConnectionWrapper { metric: &'static P2P_ACTIVE_CONNECTIONS, - map: BTreeMap, + /// NOTE: The mapping is from AccountId because we want to optimise for message + /// sending, which uses AccountId + map: BTreeMap, } impl ActiveConnectionWrapper { fn new() -> ActiveConnectionWrapper { ActiveConnectionWrapper { metric: &P2P_ACTIVE_CONNECTIONS, map: Default::default() } } - fn get(&self, account_id: &AccountId) -> Option<&ConnectedOutgoingSocket> { + fn get(&self, account_id: &AccountId) -> Option<&ConnectionState> { self.map.get(account_id) } - fn insert( - &mut self, - key: AccountId, - value: ConnectedOutgoingSocket, - ) -> Option { + fn insert(&mut self, key: AccountId, value: ConnectionState) -> Option { let result = self.map.insert(key, value); self.metric.set(self.map.len()); result } - fn remove(&mut self, key: &AccountId) -> Option { + fn remove(&mut self, key: &AccountId) -> Option { let result = self.map.remove(key); self.metric.set(self.map.len()); result @@ -208,13 +221,8 @@ struct P2PContext { /// A handle to the authenticator thread that can be used to make changes to the /// list of allowed peers authenticator: Arc, - /// Contains all existing ZMQ sockets for "client" connections. Note that ZMQ socket - /// exists even when there is no internal TCP connection (e.g. before the connection - /// is established for the first time, or when ZMQ it is reconnecting). Also, when - /// our own (independent from ZMQ) reconnection mechanism kicks in, the entry is removed - /// (because we don't want ZMQ's socket behaviour). - /// NOTE: The mapping is from AccountId because we want to optimise for message - /// sending, which uses AccountId + /// Contain entries for all nodes that we *should* be connected to (i.e. all registered + /// nodes), which are either connected or scheduled for reconnection active_connections: ActiveConnectionWrapper, /// NOTE: this is used for incoming messages when we want to map them to account_id /// NOTE: we don't use BTreeMap here because XPublicKey doesn't implement Ord. @@ -307,6 +315,10 @@ pub(super) fn start( (out_msg_sender, peer_update_sender, incoming_message_receiver, own_peer_info_receiver, fut) } +fn disconnect_socket(_socket: ConnectedOutgoingSocket) { + // Simply dropping the socket is enough +} + impl P2PContext { async fn control_loop( mut self, @@ -358,10 +370,14 @@ impl P2PContext { fn send_message(&self, account_id: AccountId, payload: Vec) { match self.active_connections.get(&account_id) { - Some(socket) => { + Some(ConnectionState::Connected(socket)) => { socket.send(payload); P2P_MSG_SENT.inc(); }, + Some(ConnectionState::ReconnectionScheduled) => { + // TODO: buffer the messages and send them later? + warn!("Failed to send message. Peer is scheduled for reconnection: {account_id}"); + }, None => { warn!("Failed to send message. Peer not registered: {account_id}") }, @@ -371,7 +387,8 @@ impl P2PContext { fn on_peer_update(&mut self, update: PeerUpdate) { match update { PeerUpdate::Registered(peer_info) => self.handle_peer_update(peer_info), - PeerUpdate::Deregistered(account_id, _pubkey) => self.remove_peer(account_id), + PeerUpdate::Deregistered(account_id, _pubkey) => + self.handle_peer_deregistration(account_id), } } @@ -385,19 +402,18 @@ impl P2PContext { } } - fn remove_peer_and_disconnect_socket(&mut self, socket: ConnectedOutgoingSocket) { - let pubkey = &socket.peer().pubkey; + fn clean_up_for_peer_pubkey(&mut self, pubkey: &XPublicKey) { self.authenticator.remove_peer(pubkey); - assert!( - self.x25519_to_account_id.remove(pubkey).is_some(), - "Invariant violation: pubkey must be present" - ); + if self.x25519_to_account_id.remove(pubkey).is_none() { + error!("Invariant violation: pubkey must be present"); + debug_assert!(false, "Invariant violation: pubkey must be present"); + } } /// Removing a peer means: (1) removing it from the list of allowed nodes, /// (2) disconnecting our "client" socket with that node, (3) removing /// any references to it in local state (mappings) - fn remove_peer(&mut self, account_id: AccountId) { + fn handle_peer_deregistration(&mut self, account_id: AccountId) { // NOTE: There is no (trivial) way to disconnect peers that are // already connected to our listening ZMQ socket, we can only // prevent future connections from being established and rely @@ -405,12 +421,21 @@ impl P2PContext { // TODO: ensure that stale/inactive connections are terminated if let Some(existing_socket) = self.active_connections.remove(&account_id) { - self.remove_peer_and_disconnect_socket(existing_socket); + match existing_socket { + ConnectionState::Connected(existing_socket) => { + disconnect_socket(existing_socket); + }, + ConnectionState::ReconnectionScheduled => { + self.reconnect_context.reset(&account_id); + }, + } } else { error!("Failed remove unknown peer: {account_id}"); } - if self.peer_infos.remove(&account_id).is_none() { + if let Some(existing_info) = self.peer_infos.remove(&account_id) { + self.clean_up_for_peer_pubkey(&existing_info.pubkey); + } else { error!("Failed to remove peer info for unknown peer: {account_id}"); } @@ -423,14 +448,16 @@ impl P2PContext { fn handle_monitor_event(&mut self, event: MonitorEvent) { match event { MonitorEvent::ConnectionFailure(account_id) => { - let Some(_existing_socket) = self.active_connections.remove(&account_id) else { + self.reconnect_context.schedule_reconnect(account_id.clone()); + if self + .active_connections + .insert(account_id.clone(), ConnectionState::ReconnectionScheduled) + .is_none() + { // NOTE: this should not happen, but this guards against any surprising ZMQ // behaviour - error!("Unexpected attempt to reconnect to an existing peer: {account_id}"); - return + error!("Unexpected attempt to reconnect to an unknown peer: {account_id}"); }; - - self.reconnect_context.schedule_reconnect(account_id); }, MonitorEvent::ConnectionSuccess(account_id) => { self.reconnect_context.reset(&account_id); @@ -440,16 +467,28 @@ impl P2PContext { fn reconnect_to_peer(&mut self, account_id: &AccountId) { if let Some(peer_info) = self.peer_infos.get(account_id) { - info!("Reconnecting to peer: {}", peer_info.account_id); - - // It is possible that while we were waiting to reconnect, - // we received a peer info update and created a new "connection". - // This connection might be "healthy", but it is safer/easier to - // remove it and proceed with reconnecting. - if self.active_connections.remove(account_id).is_some() { - debug!("Reconnecting to a peer that's already connected: {}. Existing connection was removed.", account_id); + match self.active_connections.remove(account_id) { + Some(ConnectionState::ReconnectionScheduled) => { + info!("Reconnecting to peer: {}", account_id); + self.connect_to_peer(peer_info.clone()); + }, + Some(ConnectionState::Connected(_)) => { + // It is possible that while we were waiting to reconnect, + // we received a peer info update and created a new "connection". + // It is safe to drop the reconnection attempt even if this + // ZMQ connection is not "healthy" since reconnecting + // is now in ZMQ's hands, and it shouldn't be possible that we + // have missed any new `ConnectionFailure` event since we wouldn't + // be in `Connected` state now. + debug!( + "Reconnection attempt to {} cancelled: ZMQ socket already exists.", + account_id + ); + }, + None => { + debug!("Will not reconnect to now deregistered peer: {}", account_id); + }, } - self.connect_to_peer(peer_info.clone()); } else { error!("Failed to reconnect to peer {account_id}. (Peer info not found.)"); } @@ -464,12 +503,16 @@ impl P2PContext { let connected_socket = socket.connect(peer); - if let Some(old_socket) = self.active_connections.insert(account_id, connected_socket) { + if self + .active_connections + .insert(account_id.clone(), ConnectionState::Connected(connected_socket)) + .is_some() + { // This should not happen because we always remove existing connection/socket // prior to connecting, but even if it does, it should be OK to replace the // connection (this doesn't break any invariants and the new peer info is // likely to be more up-to-date). - warn!("Replacing existing ZMQ socket: {:?}", old_socket.peer()); + error!("Unexpected existing connection while connecting to {account_id}"); } } @@ -489,14 +532,21 @@ impl P2PContext { } fn add_or_update_peer(&mut self, peer: PeerInfo) { - if let Some(existing_socket) = self.active_connections.remove(&peer.account_id) { + if let Some(existing_state) = self.active_connections.remove(&peer.account_id) { debug!( peer_info = peer.to_string(), "Received info for known peer with account id {}, updating info and reconnecting", &peer.account_id ); - self.remove_peer_and_disconnect_socket(existing_socket); + match existing_state { + ConnectionState::Connected(socket) => { + disconnect_socket(socket); + }, + ConnectionState::ReconnectionScheduled => { + self.reconnect_context.reset(&peer.account_id); + }, + } } else { debug!( peer_info = peer.to_string(), @@ -505,6 +555,11 @@ impl P2PContext { ); } + // Remove any state from previous peer info in case of update: + if let Some(existing_info) = self.peer_infos.remove(&peer.account_id) { + self.clean_up_for_peer_pubkey(&existing_info.pubkey); + } + self.authenticator.add_peer(&peer); self.peer_infos.insert(peer.account_id.clone(), peer.clone()); diff --git a/engine/src/p2p/core/socket.rs b/engine/src/p2p/core/socket.rs index 16c21e40a9..455cae1559 100644 --- a/engine/src/p2p/core/socket.rs +++ b/engine/src/p2p/core/socket.rs @@ -107,8 +107,4 @@ impl ConnectedOutgoingSocket { warn!("Failed to send a message to {}: {e}", self.peer.account_id,); } } - - pub fn peer(&self) -> &PeerInfo { - &self.peer - } }