From 771bda5c52807e85fd75d72c4488fe1193228ad8 Mon Sep 17 00:00:00 2001 From: Maxim Shishmarev Date: Tue, 31 Oct 2023 12:07:41 +1100 Subject: [PATCH] Feat: wait for registration before starting p2p (#4160) * feat: wait for peer info registration before starting p2p * refactor: remove registration status from p2p core * refactor: unify peer info and connection state maps * fix: p2p registration does not delay other modules * fix: use more accurate block hash to initialise p2p * refactor: use until finalized for submitting peer info * fix: ingnore own deregistration * feat: p2p signals when ready * chore: remove unnecessary option --- engine/src/main.rs | 3 + engine/src/p2p.rs | 65 +++--- engine/src/p2p/core.rs | 210 ++++++++------------ engine/src/p2p/core/tests.rs | 30 ++- engine/src/p2p/peer_info_submitter.rs | 102 ++++------ state-chain/pallets/cf-validator/src/lib.rs | 5 +- 6 files changed, 192 insertions(+), 223 deletions(-) diff --git a/engine/src/main.rs b/engine/src/main.rs index 6b76a8c0df..e09ac81a40 100644 --- a/engine/src/main.rs +++ b/engine/src/main.rs @@ -211,6 +211,7 @@ async fn run_main( btc_outgoing_sender, btc_incoming_receiver, peer_update_sender, + p2p_ready_receiver, p2p_fut, ) = p2p::start( state_chain_client.clone(), @@ -336,6 +337,8 @@ async fn run_main( peer_update_sender, )); + p2p_ready_receiver.await.unwrap(); + has_completed_initialising.store(true, std::sync::atomic::Ordering::Relaxed); Ok(()) diff --git a/engine/src/p2p.rs b/engine/src/p2p.rs index 3abecfa307..21a066b150 100644 --- a/engine/src/p2p.rs +++ b/engine/src/p2p.rs @@ -12,7 +12,7 @@ use crate::{ p2p::core::ed25519_secret_key_to_x25519_secret_key, settings::P2P as P2PSettings, state_chain_observer::client::{ - extrinsic_api::signed::SignedExtrinsicApi, storage_api::StorageApi, + chain_api::ChainApi, extrinsic_api::signed::SignedExtrinsicApi, storage_api::StorageApi, }, }; @@ -29,7 +29,10 @@ use futures::{Future, FutureExt}; use multisig::p2p::OutgoingMultisigStageMessages; use muxer::P2PMuxer; use sp_core::{ed25519, H256}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; use tracing::{info_span, Instrument}; use zeroize::Zeroizing; @@ -87,7 +90,7 @@ fn pk_to_string(pk: &XPublicKey) -> String { pub async fn start( state_chain_client: Arc, settings: P2PSettings, - latest_block_hash: H256, + initial_block_hash: H256, ) -> anyhow::Result<( MultisigMessageSender, MultisigMessageReceiver, @@ -96,10 +99,11 @@ pub async fn start( MultisigMessageSender, MultisigMessageReceiver, UnboundedSender, + oneshot::Receiver<()>, impl Future>, )> where - StateChainClient: StorageApi + SignedExtrinsicApi + 'static + Send + Sync, + StateChainClient: StorageApi + SignedExtrinsicApi + ChainApi + 'static + Send + Sync, { if settings.ip_address == IpAddr::V4(Ipv4Addr::UNSPECIFIED) { anyhow::bail!("Should provide a valid IP address"); @@ -123,20 +127,22 @@ where }; let current_peers = - peer_info_submitter::get_current_peer_infos(&state_chain_client, latest_block_hash) + peer_info_submitter::get_current_peer_infos(&state_chain_client, initial_block_hash) .await .context("Failed to get initial peer info")?; let our_account_id = state_chain_client.account_id(); let own_peer_info = current_peers.iter().find(|pi| pi.account_id == our_account_id).cloned(); - let ( - outgoing_message_sender, - peer_update_sender, - incoming_message_receiver, - own_peer_info_receiver, - p2p_fut, - ) = core::start(&node_key, settings.port, current_peers, our_account_id); + let (incoming_message_sender, incoming_message_receiver) = + tokio::sync::mpsc::unbounded_channel(); + + let (outgoing_message_sender, outgoing_message_receiver) = + tokio::sync::mpsc::unbounded_channel(); + + let (peer_update_sender, peer_update_receiver) = tokio::sync::mpsc::unbounded_channel(); + + let (p2p_ready_sender, p2p_ready_receiver) = oneshot::channel(); let ( eth_outgoing_sender, @@ -150,22 +156,32 @@ where let fut = task_scope(move |scope| { async move { - scope.spawn(async { - p2p_fut.await; - Ok(()) - }); - - scope.spawn( - peer_info_submitter::start( - node_key, - state_chain_client, + scope.spawn(async move { + peer_info_submitter::ensure_peer_info_registered( + &node_key, + &state_chain_client, settings.ip_address, settings.port, own_peer_info, - own_peer_info_receiver, ) - .instrument(info_span!("P2PClient")), - ); + .instrument(info_span!("P2PClient")) + .await?; + + p2p_ready_sender.send(()).unwrap(); + + core::start( + node_key, + settings.port, + current_peers, + our_account_id, + incoming_message_sender, + outgoing_message_receiver, + peer_update_receiver, + ) + .await; + + Ok(()) + }); scope.spawn(async move { muxer_future.await; @@ -185,6 +201,7 @@ where btc_outgoing_sender, btc_incoming_receiver, peer_update_sender, + p2p_ready_receiver, fut, )) } diff --git a/engine/src/p2p/core.rs b/engine/src/p2p/core.rs index c8d2238dcd..0ae57f8e88 100644 --- a/engine/src/p2p/core.rs +++ b/engine/src/p2p/core.rs @@ -6,7 +6,6 @@ mod tests; use std::{ collections::{BTreeMap, HashMap}, - future::Future, net::Ipv6Addr, sync::Arc, time::Duration, @@ -89,16 +88,6 @@ impl std::fmt::Display for PeerInfo { } } -/// Used to track "registration" status on the network -enum RegistrationStatus { - /// The node is not yet known to the network (its peer info - /// may not be known to the network yet) - Pending, - /// The node is registered, i.e. its peer info has been - /// recorded/updated - Registered, -} - pub fn ed25519_secret_key_to_x25519_secret_key( ed25519_sk: &ed25519_dalek::SecretKey, ) -> x25519_dalek::StaticSecret { @@ -188,26 +177,36 @@ enum ConnectionState { ReconnectionScheduled, } +struct ConnectionStateInfo { + state: ConnectionState, + info: PeerInfo, +} + struct ActiveConnectionWrapper { metric: &'static P2P_ACTIVE_CONNECTIONS, - /// NOTE: The mapping is from AccountId because we want to optimise for message - /// sending, which uses AccountId - map: BTreeMap, + map: BTreeMap, } impl ActiveConnectionWrapper { fn new() -> ActiveConnectionWrapper { ActiveConnectionWrapper { metric: &P2P_ACTIVE_CONNECTIONS, map: Default::default() } } - fn get(&self, account_id: &AccountId) -> Option<&ConnectionState> { + fn get(&self, account_id: &AccountId) -> Option<&ConnectionStateInfo> { self.map.get(account_id) } - fn insert(&mut self, key: AccountId, value: ConnectionState) -> Option { + fn get_mut(&mut self, account_id: &AccountId) -> Option<&mut ConnectionStateInfo> { + self.map.get_mut(account_id) + } + fn insert( + &mut self, + key: AccountId, + value: ConnectionStateInfo, + ) -> Option { let result = self.map.insert(key, value); self.metric.set(self.map.len()); result } - fn remove(&mut self, key: &AccountId) -> Option { + fn remove(&mut self, key: &AccountId) -> Option { let result = self.map.remove(key); self.metric.set(self.map.len()); result @@ -227,32 +226,25 @@ struct P2PContext { /// NOTE: this is used for incoming messages when we want to map them to account_id /// NOTE: we don't use BTreeMap here because XPublicKey doesn't implement Ord. x25519_to_account_id: HashMap, - peer_infos: BTreeMap, /// Channel through which we send incoming messages to the multisig incoming_message_sender: UnboundedSender<(AccountId, Vec)>, - own_peer_info_sender: UnboundedSender, reconnect_context: ReconnectContext, /// This is how we communicate with the "monitor" thread monitor_handle: monitor::MonitorHandle, - /// Our own "registration" status on the network - status: RegistrationStatus, our_account_id: AccountId, /// NOTE: zmq context is intentionally declared at the bottom of the struct /// to ensure its destructor is called after that of any zmq sockets zmq_context: zmq::Context, } -pub(super) fn start( - p2p_key: &P2PKey, +pub(super) async fn start( + p2p_key: P2PKey, port: Port, current_peers: Vec, our_account_id: AccountId, -) -> ( - UnboundedSender, - UnboundedSender, - UnboundedReceiver<(AccountId, Vec)>, - UnboundedReceiver, - impl Future, + incoming_message_sender: UnboundedSender<(AccountId, Vec)>, + outgoing_message_receiver: UnboundedReceiver, + peer_update_receiver: UnboundedReceiver, ) { debug!("Our derived x25519 pubkey: {}", pk_to_string(&p2p_key.encryption_key.public_key)); @@ -266,53 +258,40 @@ pub(super) fn start( let authenticator = auth::start_authentication_thread(zmq_context.clone()); - let (incoming_message_sender, incoming_message_receiver) = - tokio::sync::mpsc::unbounded_channel(); - let (reconnect_sender, reconnect_receiver) = tokio::sync::mpsc::unbounded_channel(); let (monitor_handle, monitor_event_receiver) = monitor::start_monitoring_thread(zmq_context.clone()); - // A channel used to notify whenever our own peer info changes on SC - let (own_peer_info_sender, own_peer_info_receiver) = tokio::sync::mpsc::unbounded_channel(); - let mut context = P2PContext { zmq_context, - key: p2p_key.encryption_key.clone(), + key: p2p_key.encryption_key, monitor_handle, authenticator, active_connections: ActiveConnectionWrapper::new(), x25519_to_account_id: Default::default(), - peer_infos: Default::default(), reconnect_context: ReconnectContext::new(reconnect_sender), incoming_message_sender, - own_peer_info_sender, our_account_id, - status: RegistrationStatus::Pending, }; debug!("Registering peer info for {} peers", current_peers.len()); for peer_info in current_peers { - context.handle_peer_update(peer_info); + context.add_or_update_peer(peer_info); } let incoming_message_receiver_ed25519 = context.start_listening_thread(port); - let (out_msg_sender, out_msg_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (peer_update_sender, peer_update_receiver) = tokio::sync::mpsc::unbounded_channel(); - - let fut = context + context .control_loop( - out_msg_receiver, + outgoing_message_receiver, incoming_message_receiver_ed25519, peer_update_receiver, monitor_event_receiver, reconnect_receiver, ) - .instrument(info_span!("p2p")); - - (out_msg_sender, peer_update_sender, incoming_message_receiver, own_peer_info_receiver, fut) + .instrument(info_span!("p2p")) + .await; } fn disconnect_socket(_socket: ConnectedOutgoingSocket) { @@ -369,24 +348,27 @@ impl P2PContext { } fn send_message(&self, account_id: AccountId, payload: Vec) { - match self.active_connections.get(&account_id) { - Some(ConnectionState::Connected(socket)) => { - socket.send(payload); - P2P_MSG_SENT.inc(); - }, - Some(ConnectionState::ReconnectionScheduled) => { - // TODO: buffer the messages and send them later? - warn!("Failed to send message. Peer is scheduled for reconnection: {account_id}"); - }, - None => { - warn!("Failed to send message. Peer not registered: {account_id}") - }, + if let Some(peer) = self.active_connections.get(&account_id) { + match &peer.state { + ConnectionState::Connected(socket) => { + socket.send(payload); + P2P_MSG_SENT.inc(); + }, + ConnectionState::ReconnectionScheduled => { + // TODO: buffer the messages and send them later? + warn!( + "Failed to send message. Peer is scheduled for reconnection: {account_id}" + ); + }, + } + } else { + warn!("Failed to send message. Peer not registered: {account_id}") } } fn on_peer_update(&mut self, update: PeerUpdate) { match update { - PeerUpdate::Registered(peer_info) => self.handle_peer_update(peer_info), + PeerUpdate::Registered(peer_info) => self.add_or_update_peer(peer_info), PeerUpdate::Deregistered(account_id, _pubkey) => self.handle_peer_deregistration(account_id), } @@ -420,8 +402,13 @@ impl P2PContext { // on peer from disconnecting from "client side". // TODO: ensure that stale/inactive connections are terminated - if let Some(existing_socket) = self.active_connections.remove(&account_id) { - match existing_socket { + if account_id == self.our_account_id { + warn!("Received peer info deregistration of our own node!"); + return + } + + if let Some(peer) = self.active_connections.remove(&account_id) { + match peer.state { ConnectionState::Connected(existing_socket) => { disconnect_socket(existing_socket); }, @@ -429,14 +416,10 @@ impl P2PContext { self.reconnect_context.reset(&account_id); }, } - } else { - error!("Failed remove unknown peer: {account_id}"); - } - if let Some(existing_info) = self.peer_infos.remove(&account_id) { - self.clean_up_for_peer_pubkey(&existing_info.pubkey); + self.clean_up_for_peer_pubkey(&peer.info.pubkey); } else { - error!("Failed to remove peer info for unknown peer: {account_id}"); + error!("Failed remove unknown peer: {account_id}"); } // There may or may not be a reconnection delay for @@ -449,15 +432,11 @@ impl P2PContext { match event { MonitorEvent::ConnectionFailure(account_id) => { self.reconnect_context.schedule_reconnect(account_id.clone()); - if self - .active_connections - .insert(account_id.clone(), ConnectionState::ReconnectionScheduled) - .is_none() - { - // NOTE: this should not happen, but this guards against any surprising ZMQ - // behaviour + if let Some(peer) = self.active_connections.get_mut(&account_id) { + peer.state = ConnectionState::ReconnectionScheduled; + } else { error!("Unexpected attempt to reconnect to an unknown peer: {account_id}"); - }; + } }, MonitorEvent::ConnectionSuccess(account_id) => { self.reconnect_context.reset(&account_id); @@ -466,13 +445,13 @@ impl P2PContext { } fn reconnect_to_peer(&mut self, account_id: &AccountId) { - if let Some(peer_info) = self.peer_infos.get(account_id) { - match self.active_connections.remove(account_id) { - Some(ConnectionState::ReconnectionScheduled) => { + if let Some(peer) = self.active_connections.remove(account_id) { + match peer.state { + ConnectionState::ReconnectionScheduled => { info!("Reconnecting to peer: {}", account_id); - self.connect_to_peer(peer_info.clone()); + self.connect_to_peer(peer.info.clone()); }, - Some(ConnectionState::Connected(_)) => { + ConnectionState::Connected(_) => { // It is possible that while we were waiting to reconnect, // we received a peer info update and created a new "connection". // It is safe to drop the reconnection attempt even if this @@ -485,12 +464,9 @@ impl P2PContext { account_id ); }, - None => { - debug!("Will not reconnect to now deregistered peer: {}", account_id); - }, } } else { - error!("Failed to reconnect to peer {account_id}. (Peer info not found.)"); + debug!("Will not reconnect to now deregistered peer: {}", account_id); } } @@ -501,11 +477,17 @@ impl P2PContext { self.monitor_handle.start_monitoring_for(&socket, &peer); - let connected_socket = socket.connect(peer); + let connected_socket = socket.connect(peer.clone()); if self .active_connections - .insert(account_id.clone(), ConnectionState::Connected(connected_socket)) + .insert( + account_id.clone(), + ConnectionStateInfo { + state: ConnectionState::Connected(connected_socket), + info: peer, + }, + ) .is_some() { // This should not happen because we always remove existing connection/socket @@ -516,30 +498,20 @@ impl P2PContext { } } - fn handle_own_registration(&mut self, own_info: PeerInfo) { - debug!("Received own node's registration. Starting to connect to peers."); - - self.own_peer_info_sender.send(own_info).unwrap(); - - if let RegistrationStatus::Pending = &mut self.status { - let peers: Vec<_> = self.peer_infos.values().cloned().collect(); - // Connect to all outstanding peers - for peer in peers { - self.connect_to_peer(peer) - } - self.status = RegistrationStatus::Registered; - }; - } - fn add_or_update_peer(&mut self, peer: PeerInfo) { - if let Some(existing_state) = self.active_connections.remove(&peer.account_id) { + if peer.account_id == self.our_account_id { + // nothing to do + return + } + + if let Some(existing_peer_state) = self.active_connections.remove(&peer.account_id) { debug!( peer_info = peer.to_string(), "Received info for known peer with account id {}, updating info and reconnecting", &peer.account_id ); - match existing_state { + match existing_peer_state.state { ConnectionState::Connected(socket) => { disconnect_socket(socket); }, @@ -547,6 +519,8 @@ impl P2PContext { self.reconnect_context.reset(&peer.account_id); }, } + // Remove any state from previous peer info in case of update: + self.clean_up_for_peer_pubkey(&existing_peer_state.info.pubkey); } else { debug!( peer_info = peer.to_string(), @@ -555,35 +529,11 @@ impl P2PContext { ); } - // Remove any state from previous peer info in case of update: - if let Some(existing_info) = self.peer_infos.remove(&peer.account_id) { - self.clean_up_for_peer_pubkey(&existing_info.pubkey); - } - self.authenticator.add_peer(&peer); - self.peer_infos.insert(peer.account_id.clone(), peer.clone()); - self.x25519_to_account_id.insert(peer.pubkey, peer.account_id.clone()); - match &mut self.status { - RegistrationStatus::Pending => { - // We will connect to all peers in `self.peer_infos` once we receive our own - // registration - info!("Delaying connecting to {}", peer.account_id); - }, - RegistrationStatus::Registered => { - self.connect_to_peer(peer); - }, - } - } - - fn handle_peer_update(&mut self, peer: PeerInfo) { - if peer.account_id == self.our_account_id { - self.handle_own_registration(peer); - } else { - self.add_or_update_peer(peer); - } + self.connect_to_peer(peer); } /// Start listening for incoming p2p messages on a separate thread diff --git a/engine/src/p2p/core/tests.rs b/engine/src/p2p/core/tests.rs index 540662d622..c2f1d5edd7 100644 --- a/engine/src/p2p/core/tests.rs +++ b/engine/src/p2p/core/tests.rs @@ -27,7 +27,6 @@ struct Node { account_id: AccountId, msg_sender: UnboundedSender, peer_update_sender: UnboundedSender, - _own_peer_info_receiver: UnboundedReceiver, msg_receiver: UnboundedReceiver<(AccountId, Vec)>, } @@ -41,19 +40,34 @@ fn spawn_node( // Secret key does not implement clone: let secret = ed25519_dalek::SecretKey::from_bytes(&key.secret.to_bytes()).unwrap(); - let key = P2PKey::new(secret); - let (msg_sender, peer_update_sender, msg_receiver, own_peer_info_receiver, fut) = - super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id.clone()); - tokio::spawn(fut.instrument(info_span!("node", idx = idx))); + let (incoming_message_sender, incoming_message_receiver) = + tokio::sync::mpsc::unbounded_channel(); + + let (outgoing_message_sender, outgoing_message_receiver) = + tokio::sync::mpsc::unbounded_channel(); + + let (peer_update_sender, peer_update_receiver) = tokio::sync::mpsc::unbounded_channel(); + + tokio::spawn({ + super::start( + key, + our_peer_info.port, + peer_infos.to_vec(), + account_id.clone(), + incoming_message_sender, + outgoing_message_receiver, + peer_update_receiver, + ) + .instrument(info_span!("node", idx = idx)) + }); Node { account_id, - msg_sender, + msg_sender: outgoing_message_sender, peer_update_sender, - _own_peer_info_receiver: own_peer_info_receiver, - msg_receiver, + msg_receiver: incoming_message_receiver, } } diff --git a/engine/src/p2p/peer_info_submitter.rs b/engine/src/p2p/peer_info_submitter.rs index a5d76b5aaa..8faaab1128 100644 --- a/engine/src/p2p/peer_info_submitter.rs +++ b/engine/src/p2p/peer_info_submitter.rs @@ -1,21 +1,21 @@ use std::{ net::{IpAddr, Ipv6Addr}, sync::Arc, - time::Duration, }; use anyhow::Result; -use sp_core::H256; -use tokio::sync::mpsc::UnboundedReceiver; use codec::Encode; -use tracing::{debug, info}; -use utilities::{make_periodic_tick, Port}; +use sp_core::H256; +use tracing::info; +use utilities::Port; use crate::{ p2p::PeerInfo, state_chain_observer::client::{ - extrinsic_api::signed::SignedExtrinsicApi, storage_api::StorageApi, + chain_api::ChainApi, + extrinsic_api::signed::{SignedExtrinsicApi, UntilFinalized}, + storage_api::StorageApi, }, }; @@ -24,26 +24,12 @@ use super::P2PKey; async fn update_registered_peer_id( p2p_key: &P2PKey, state_chain_client: &Arc, - previous_registered_peer_info: &Option, ip_address: Ipv6Addr, cfe_port: Port, -) where +) -> Result<()> +where StateChainClient: SignedExtrinsicApi, { - let extra_info = match previous_registered_peer_info.as_ref() { - Some(peer_info) => { - format!( - "Node was previously registered with address [{}]:{}", - peer_info.ip, peer_info.port - ) - }, - None => String::from("Node previously did not have a registered address"), - }; - - info!( - "Registering node's peer info. Address: [{ip_address}]:{cfe_port}, x25519 public key: {}. {extra_info}.", - super::pk_to_string(&p2p_key.encryption_key.public_key)); - let peer_id = sp_core::ed25519::Public(p2p_key.signing_key.public.to_bytes()); let signature = { @@ -60,55 +46,51 @@ async fn update_registered_peer_id( // We sign over our account id signature: sp_core::ed25519::Signature::try_from(signature.as_ref()).unwrap(), }) - .await; + .await + .until_finalized() + .await?; + + Ok(()) } -pub(super) async fn start( - p2p_key: P2PKey, - state_chain_client: Arc, +pub(super) async fn ensure_peer_info_registered( + p2p_key: &P2PKey, + state_chain_client: &Arc, ip_address: IpAddr, cfe_port: Port, - mut previous_registered_peer_info: Option, - mut own_peer_info_receiver: UnboundedReceiver, + previous_registered_peer_info: Option, ) -> Result<()> where - StateChainClient: StorageApi + SignedExtrinsicApi + Send + Sync, + StateChainClient: StorageApi + SignedExtrinsicApi + ChainApi + Send + Sync, { let ip_address = match ip_address { IpAddr::V4(ipv4) => ipv4.to_ipv6_mapped(), IpAddr::V6(ipv6) => ipv6, }; - let public_encryption_key = &p2p_key.encryption_key.public_key; - - let mut update_interval = make_periodic_tick(Duration::from_secs(60), true); - - // Periodically try to update our address on chain until we receive - // a confirmation (own peer info that matches desired values) - loop { - tokio::select! { - Some(own_info) = own_peer_info_receiver.recv() => { - previous_registered_peer_info = Some(own_info); - } - _ = update_interval.tick() => { - if Some((ip_address, cfe_port, public_encryption_key)) != previous_registered_peer_info - .as_ref() - .map(|pi| (pi.ip, pi.port, &pi.pubkey)) - { - update_registered_peer_id( - &p2p_key, - &state_chain_client, - &previous_registered_peer_info, - ip_address, - cfe_port, - ) - .await; - } else { - debug!("Our peer info registration is up to date"); - break; - } - } - } + let public_encryption_key = p2p_key.encryption_key.public_key; + + if Some((ip_address, cfe_port, public_encryption_key)) != + previous_registered_peer_info.as_ref().map(|pi| (pi.ip, pi.port, pi.pubkey)) + { + let extra_info = match previous_registered_peer_info.as_ref() { + Some(peer_info) => { + format!( + "Node was previously registered with address [{}]:{}", + peer_info.ip, peer_info.port + ) + }, + None => String::from("Node previously did not have a registered address"), + }; + + info!( + "Registering node's peer info. Address: [{ip_address}]:{cfe_port}, x25519 public key: {}. {extra_info}.", + super::pk_to_string(&public_encryption_key)); + + update_registered_peer_id(p2p_key, state_chain_client, ip_address, cfe_port).await?; + info!("Our peer info registration is now up to date!"); + } else { + info!("Our peer info registration is already up to date!"); } Ok(()) @@ -119,7 +101,7 @@ pub async fn get_current_peer_infos( block_hash: H256, ) -> anyhow::Result> where - StateChainClient: StorageApi, + StateChainClient: StorageApi + ChainApi, { let peer_infos: Vec<_> = state_chain_client .storage_map::, Vec<_>>( diff --git a/state-chain/pallets/cf-validator/src/lib.rs b/state-chain/pallets/cf-validator/src/lib.rs index 87da67a09c..05f250968d 100644 --- a/state-chain/pallets/cf-validator/src/lib.rs +++ b/state-chain/pallets/cf-validator/src/lib.rs @@ -208,7 +208,8 @@ pub mod pallet { pub type AccountPeerMapping = StorageMap<_, Blake2_128Concat, T::AccountId, (Ed25519PublicKey, Port, Ipv6Addr)>; - /// Peers that are associated with account ids. + /// Ed25519 public keys (aka peer ids) that are associated with account ids. (We keep track + /// of them to ensure they don't somehow get reused between different account ids.) #[pallet::storage] #[pallet::getter(fn mapped_peer)] pub type MappedPeers = StorageMap<_, Blake2_128Concat, Ed25519PublicKey, ()>; @@ -608,6 +609,8 @@ pub mod pallet { let account_id = T::AccountRoleRegistry::ensure_validator(origin)?; + // Note: this signature is necessary to prevent "rogue key" attacks (by ensuring + // that `account_id` holds the corresponding secret key for `peer_id`) // Note: This signature verify doesn't need replay protection as you need the // account_id's private key to pass the above ensure_validator which has replay // protection. Note: Decode impl for peer_id's type doesn't detect invalid PublicKeys,