From 5ec2056f04b53897b572cfffff10943f375360c3 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Thu, 7 Dec 2023 11:33:57 +0900 Subject: [PATCH 01/18] Compute prefix mappings --- consensus/types/src/subnet_id.rs | 49 ++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 82e12b7ec12..ba534c69331 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -2,6 +2,7 @@ use crate::{AttestationData, ChainSpec, CommitteeIndex, Epoch, EthSpec, Slot}; use safe_arith::{ArithError, SafeArith}; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::ops::{Deref, DerefMut}; use swap_or_not_shuffle::compute_shuffled_index; @@ -127,6 +128,30 @@ impl SubnetId { }); Ok((subnet_set_generator, valid_until_epoch.into())) } + + /// Compute a mapping of `SubnetId` to `NodeId`s, which can be used for subnet discovery searches. + pub fn compute_prefix_mapping_for_epoch( + epoch: Epoch, + spec: &ChainSpec, + ) -> Result>, &'static str> { + // simplify variable naming + let prefix_bits = spec.attestation_subnet_prefix_bits as u32; + let shuffling_prefix_bits = spec.attestation_subnet_shuffling_prefix_bits as u32; + + let mut mapping = HashMap::new(); + + for i in 0..2_i32.pow(prefix_bits + shuffling_prefix_bits) { + let node_id = + ethereum_types::U256::from(i) << (256 - (prefix_bits + shuffling_prefix_bits)); + let (subnets, _) = Self::compute_subnets_for_epoch::(node_id, epoch, spec)?; + + for subnet_id in subnets { + mapping.entry(subnet_id).or_insert(vec![]).push(node_id); + } + } + + Ok(mapping) + } } impl Deref for SubnetId { @@ -237,4 +262,28 @@ mod tests { assert_eq!(Epoch::from(expected_valid_time[x]), valid_time); } } + + #[test] + fn compute_prefix_mapping_for_epoch_unit_test() { + let epochs = [ + 54321u64, 1017090249, 1827566880, 846255942, 766597383, 1204990115, 1616209495, + 1774367616, 1484598751, 3525502229, + ] + .map(Epoch::from); + + // Test mainnet + let spec = ChainSpec::mainnet(); + + for epoch in epochs { + let mapping = + SubnetId::compute_prefix_mapping_for_epoch::(epoch, &spec) + .unwrap(); + + assert_eq!(mapping.len(), 64); + + for (_, node_ids) in mapping.into_iter() { + assert!(node_ids.len() > 0); + } + } + } } From 878027654f0ebc498168c7d9f0646fc1d7f5d710 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Thu, 7 Dec 2023 12:10:12 +0900 Subject: [PATCH 02/18] Add PrefixMapping --- Cargo.lock | 2 + beacon_node/lighthouse_network/Cargo.toml | 2 + .../lighthouse_network/src/discovery/mod.rs | 79 +++++++++++++++++-- .../src/service/behaviour.rs | 6 +- .../lighthouse_network/src/service/mod.rs | 14 ++-- beacon_node/network/src/service.rs | 11 ++- 6 files changed, 99 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 21fea637f0a..26b5bd778c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4506,6 +4506,7 @@ dependencies = [ "dirs", "discv5", "error-chain", + "ethereum-types 0.14.1", "ethereum_ssz", "ethereum_ssz_derive", "exit-future", @@ -4531,6 +4532,7 @@ dependencies = [ "slog", "slog-async", "slog-term", + "slot_clock", "smallvec", "snap", "ssz_types", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 125bbe9bc2f..7f37e47231b 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -12,6 +12,7 @@ types = { workspace = true } serde = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } +ethereum-types = { workspace = true } tree_hash = { workspace = true } tree_hash_derive = { workspace = true } slog = { workspace = true } @@ -45,6 +46,7 @@ delay_map = { workspace = true } void = "1" libp2p-quic= { version = "0.9.2", features=["tokio"]} libp2p-mplex = "0.40.0" +slot_clock = { workspace = true } [dependencies.libp2p] version = "0.52" diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 388790568f0..e0ed8f5168a 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -19,6 +19,7 @@ pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt}; pub use libp2p::identity::{Keypair, PublicKey}; use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY}; +use ethereum_types::U256; use futures::prelude::*; use futures::stream::FuturesUnordered; use libp2p::multiaddr::Protocol; @@ -34,7 +35,9 @@ pub use libp2p::{ }; use lru::LruCache; use slog::{crit, debug, error, info, trace, warn}; +use slot_clock::SlotClock; use ssz::Encode; +use std::collections::hash_map::Entry; use std::{ collections::{HashMap, VecDeque}, net::{IpAddr, SocketAddr}, @@ -45,7 +48,7 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::mpsc; -use types::{EnrForkId, EthSpec}; +use types::{ChainSpec, EnrForkId, Epoch, EthSpec, SubnetId}; mod subnet_predicate; pub use subnet_predicate::subnet_predicate; @@ -156,9 +159,67 @@ enum EventStream { InActive, } +/// Stores mappings of `SubnetId` to `NodeId`s. +struct PrefixMapping { + slot_clock: TSlotClock, + chain_spec: ChainSpec, + mappings: HashMap>>, +} + +impl PrefixMapping { + fn new(slot_clock: TSlotClock, spec: ChainSpec) -> Self { + Self { + slot_clock, + chain_spec: spec, + mappings: HashMap::new(), + } + } + + /// Returns `NodeId` with the prefix that should be subscribed to the given `SubnetId`. + fn get(&mut self, subnet_id: &SubnetId) -> Result { + let current_epoch = self + .slot_clock + .now() + .ok_or("Failed to get the current epoch from clock")? + .epoch(TSpec::slots_per_epoch()); + + let select_node_id = + |mapping: &HashMap>| -> Result { + // TODO: randomize node_id that to be returned. + Ok(mapping + .get(subnet_id) + .map(|node_ids| node_ids[0].clone()) + .ok_or("No NodeId in the prefix mapping.")?) + }; + + let (node_id, vacant) = match self.mappings.entry(current_epoch) { + Entry::Occupied(entry) => { + let mapping = entry.get(); + (select_node_id(mapping)?, false) + } + Entry::Vacant(entry) => { + // compute prefixes + let computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( + current_epoch, + &self.chain_spec, + )?; + let mapping = entry.insert(computed_mapping); + (select_node_id(mapping)?, true) + } + }; + + // Remove expired mappings + if vacant { + self.mappings.retain(|epoch, _| epoch >= ¤t_epoch); + } + + Ok(node_id) + } +} + /// The main discovery service. This can be disabled via CLI arguements. When disabled the /// underlying processes are not started, but this struct still maintains our current ENR. -pub struct Discovery { +pub struct Discovery { /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. cached_enrs: LruCache, @@ -196,15 +257,20 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, + + /// Mappings of `SubnetId` to `NodeId`s for subnet discovery. + prefix_mapping: PrefixMapping, } -impl Discovery { +impl Discovery { /// NOTE: Creating discovery requires running within a tokio execution environment. pub async fn new( local_key: Keypair, config: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, + chain_spec: ChainSpec, + slot_clock: TSlotClock, ) -> error::Result { let log = log.clone(); @@ -329,6 +395,7 @@ impl Discovery { update_ports, log, enr_dir, + prefix_mapping: PrefixMapping::new(slot_clock, chain_spec), }) } @@ -920,7 +987,9 @@ impl Discovery { /* NetworkBehaviour Implementation */ -impl NetworkBehaviour for Discovery { +impl NetworkBehaviour + for Discovery +{ // Discovery is not a real NetworkBehaviour... type ConnectionHandler = ConnectionHandler; type ToSwarm = DiscoveredPeers; @@ -1131,7 +1200,7 @@ impl NetworkBehaviour for Discovery { } } -impl Discovery { +impl Discovery { fn on_dial_failure(&mut self, peer_id: Option, error: &DialError) { if let Some(peer_id) = peer_id { match error { diff --git a/beacon_node/lighthouse_network/src/service/behaviour.rs b/beacon_node/lighthouse_network/src/service/behaviour.rs index 8dd750429c3..de9116a0414 100644 --- a/beacon_node/lighthouse_network/src/service/behaviour.rs +++ b/beacon_node/lighthouse_network/src/service/behaviour.rs @@ -6,6 +6,7 @@ use crate::types::SnappyTransform; use libp2p::gossipsub; use libp2p::identify; use libp2p::swarm::NetworkBehaviour; +use slot_clock::SlotClock; use types::EthSpec; use super::api_types::RequestId; @@ -15,10 +16,11 @@ pub type SubscriptionFilter = pub type Gossipsub = gossipsub::Behaviour; #[derive(NetworkBehaviour)] -pub(crate) struct Behaviour +pub(crate) struct Behaviour where AppReqId: ReqId, TSpec: EthSpec, + TSlotClock: SlotClock, { /// Keep track of active and pending connections to enforce hard limits. pub connection_limits: libp2p::connection_limits::Behaviour, @@ -27,7 +29,7 @@ where /// The Eth2 RPC specified in the wire-0 protocol. pub eth2_rpc: RPC, TSpec>, /// Discv5 Discovery protocol. - pub discovery: Discovery, + pub discovery: Discovery, /// Keep regular connection to peers and disconnect if absent. // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index a38b7b2f2ef..dfdc060d969 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -34,6 +34,7 @@ use libp2p::swarm::{Swarm, SwarmEvent}; use libp2p::PeerId; use libp2p::{identify, SwarmBuilder}; use slog::{crit, debug, info, o, trace, warn}; +use slot_clock::SlotClock; use std::path::PathBuf; use std::pin::Pin; use std::{ @@ -110,8 +111,8 @@ pub enum NetworkEvent { /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. -pub struct Network { - swarm: libp2p::swarm::Swarm>, +pub struct Network { + swarm: libp2p::swarm::Swarm>, /* Auxiliary Fields */ /// A collections of variables accessible outside the network service. network_globals: Arc>, @@ -136,11 +137,12 @@ pub struct Network { } /// Implements the combined behaviour for the libp2p service. -impl Network { +impl Network { pub async fn new( executor: task_executor::TaskExecutor, ctx: ServiceContext<'_>, log: &slog::Logger, + slot_clock: TSlotClock, ) -> error::Result<(Self, Arc>)> { let log = log.new(o!("service"=> "libp2p")); let mut config = ctx.config.clone(); @@ -300,6 +302,8 @@ impl Network { &config, network_globals.clone(), &log, + ctx.chain_spec.clone(), + slot_clock, ) .await?; // start searching for peers @@ -543,7 +547,7 @@ impl Network { &mut self.swarm.behaviour_mut().eth2_rpc } /// Discv5 Discovery protocol. - pub fn discovery_mut(&mut self) -> &mut Discovery { + pub fn discovery_mut(&mut self) -> &mut Discovery { &mut self.swarm.behaviour_mut().discovery } /// Provides IP addresses and peer information. @@ -564,7 +568,7 @@ impl Network { &self.swarm.behaviour().eth2_rpc } /// Discv5 Discovery protocol. - pub fn discovery(&self) -> &Discovery { + pub fn discovery(&self) -> &Discovery { &self.swarm.behaviour().discovery } /// Provides IP addresses and peer information. diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 03715dd99f2..7aa1f58bf65 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -172,7 +172,7 @@ pub struct NetworkService { /// A reference to the underlying beacon chain. beacon_chain: Arc>, /// The underlying libp2p service that drives all the network interactions. - libp2p: Network, + libp2p: Network, /// An attestation and subnet manager service. attestation_service: AttestationService, /// A sync committeee subnet manager service. @@ -289,8 +289,13 @@ impl NetworkService { }; // launch libp2p service - let (mut libp2p, network_globals) = - Network::new(executor.clone(), service_context, &network_log).await?; + let (mut libp2p, network_globals) = Network::new( + executor.clone(), + service_context, + &network_log, + beacon_chain.slot_clock.clone(), + ) + .await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { From 5f7788b72a9988af16e5f12dcb740a474f4f9cd9 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 11 Dec 2023 09:17:08 +0900 Subject: [PATCH 03/18] Run prefix searching --- .../lighthouse_network/src/discovery/mod.rs | 106 +++++++++++++----- 1 file changed, 80 insertions(+), 26 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index e0ed8f5168a..bda00d9560d 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -131,6 +131,8 @@ impl std::fmt::Debug for SubnetQuery { enum QueryType { /// We are searching for subnet peers. Subnet(Vec), + /// We are prefix searching for subnet peers. + PrefixSearch(Vec), /// We are searching for more peers without ENR or time constraints. FindPeers, } @@ -791,16 +793,32 @@ impl Discovery { // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log); - debug!( - self.log, - "Starting grouped subnet query"; - "subnets" => ?filtered_subnet_queries, - ); - self.start_query( - QueryType::Subnet(filtered_subnet_queries), - TARGET_PEERS_FOR_GROUPED_QUERY, - subnet_predicate, - ); + // TODO: Add CLI flag for this? + let prefix_search_for_subnet = true; + + if prefix_search_for_subnet { + debug!( + self.log, + "Starting prefix search query"; + "subnets" => ?filtered_subnet_queries, + ); + self.start_query( + QueryType::PrefixSearch(filtered_subnet_queries), + TARGET_PEERS_FOR_GROUPED_QUERY, + subnet_predicate, + ); + } else { + debug!( + self.log, + "Starting grouped subnet query"; + "subnets" => ?filtered_subnet_queries, + ); + self.start_query( + QueryType::Subnet(filtered_subnet_queries), + TARGET_PEERS_FOR_GROUPED_QUERY, + subnet_predicate, + ); + } } } @@ -831,22 +849,58 @@ impl Discovery { && (enr.tcp4().is_some() || enr.tcp6().is_some()) }; - // General predicate - let predicate: Box bool + Send> = - Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); - - // Build the future - let query_future = self - .discv5 - // Generate a random target node id. - .find_node_predicate(NodeId::random(), predicate, target_peers) - .map(|v| QueryResult { - query_type: query, - result: v, - }); + if let QueryType::PrefixSearch(subnet_queries) = query { + // Split the grouped subnet query into individual queries in order to prefix search. + for subnet_query in subnet_queries { + // Target node + let target_node = match &subnet_query.subnet { + Subnet::Attestation(subnet_id) => { + match self.prefix_mapping.get::(subnet_id) { + Ok(raw) => { + let raw_node_id: [u8; 32] = raw.into(); + NodeId::from(raw_node_id) + } + Err(e) => { + warn!(self.log, "Failed to get target NodeId"; "error" => %e, "subnet_id" => ?subnet_id); + continue; + } + } + } + Subnet::SyncCommittee(_) => NodeId::random(), + }; + // Build the future + let query_future = self + .discv5 + .find_node_predicate( + target_node, + Box::new(eth2_fork_predicate.clone()), + target_peers, + ) + .map(|v| QueryResult { + query_type: QueryType::PrefixSearch(vec![subnet_query]), + result: v, + }); - // Add the future to active queries, to be executed. - self.active_queries.push(Box::pin(query_future)); + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } + } else { + // General predicate + let predicate: Box bool + Send> = + Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); + // Build the future + let query_future = self + .discv5 + // Generate a random target node id. + .find_node_predicate(NodeId::random(), predicate, target_peers) + .map(|v| QueryResult { + query_type: query, + result: v, + }); + + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); + } } /// Process the completed QueryResult returned from discv5. @@ -878,7 +932,7 @@ impl Discovery { } } } - QueryType::Subnet(queries) => { + QueryType::Subnet(queries) | QueryType::PrefixSearch(queries) => { let subnets_searched_for: Vec = queries.iter().map(|query| query.subnet).collect(); match query.result { From af0d757c8e7a7c37c22d881b148e5a80854db70b Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 15 Dec 2023 00:57:15 +0900 Subject: [PATCH 04/18] Move the discovery logic to `network` and avoid extra trait bounds in `lighthouse_network` --- Cargo.lock | 2 - beacon_node/lighthouse_network/Cargo.toml | 2 - .../lighthouse_network/src/discovery/mod.rs | 97 ++----------------- .../src/service/behaviour.rs | 6 +- .../lighthouse_network/src/service/mod.rs | 35 ++++--- .../lighthouse_network/src/types/mod.rs | 2 +- .../lighthouse_network/src/types/subnet.rs | 12 +++ beacon_node/network/src/lib.rs | 1 + beacon_node/network/src/prefix_mapping.rs | 71 ++++++++++++++ beacon_node/network/src/service.rs | 74 +++++++++++--- 10 files changed, 179 insertions(+), 123 deletions(-) create mode 100644 beacon_node/network/src/prefix_mapping.rs diff --git a/Cargo.lock b/Cargo.lock index 26b5bd778c7..21fea637f0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4506,7 +4506,6 @@ dependencies = [ "dirs", "discv5", "error-chain", - "ethereum-types 0.14.1", "ethereum_ssz", "ethereum_ssz_derive", "exit-future", @@ -4532,7 +4531,6 @@ dependencies = [ "slog", "slog-async", "slog-term", - "slot_clock", "smallvec", "snap", "ssz_types", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 7f37e47231b..125bbe9bc2f 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -12,7 +12,6 @@ types = { workspace = true } serde = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } -ethereum-types = { workspace = true } tree_hash = { workspace = true } tree_hash_derive = { workspace = true } slog = { workspace = true } @@ -46,7 +45,6 @@ delay_map = { workspace = true } void = "1" libp2p-quic= { version = "0.9.2", features=["tokio"]} libp2p-mplex = "0.40.0" -slot_clock = { workspace = true } [dependencies.libp2p] version = "0.52" diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index bda00d9560d..0a98eb9d6f7 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -8,7 +8,7 @@ pub mod enr_ext; // Allow external use of the lighthouse ENR builder use crate::service::TARGET_SUBNET_PEERS; -use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery}; +use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet}; use crate::{metrics, ClearDialError}; use discv5::{enr::NodeId, Discv5, Discv5Event}; pub use enr::{ @@ -19,7 +19,6 @@ pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt}; pub use libp2p::identity::{Keypair, PublicKey}; use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY}; -use ethereum_types::U256; use futures::prelude::*; use futures::stream::FuturesUnordered; use libp2p::multiaddr::Protocol; @@ -35,9 +34,7 @@ pub use libp2p::{ }; use lru::LruCache; use slog::{crit, debug, error, info, trace, warn}; -use slot_clock::SlotClock; use ssz::Encode; -use std::collections::hash_map::Entry; use std::{ collections::{HashMap, VecDeque}, net::{IpAddr, SocketAddr}, @@ -48,9 +45,10 @@ use std::{ time::{Duration, Instant}, }; use tokio::sync::mpsc; -use types::{ChainSpec, EnrForkId, Epoch, EthSpec, SubnetId}; +use types::{EnrForkId, EthSpec}; mod subnet_predicate; +use crate::types::TargetedSubnetDiscovery; pub use subnet_predicate::subnet_predicate; /// Local ENR storage filename. @@ -161,67 +159,9 @@ enum EventStream { InActive, } -/// Stores mappings of `SubnetId` to `NodeId`s. -struct PrefixMapping { - slot_clock: TSlotClock, - chain_spec: ChainSpec, - mappings: HashMap>>, -} - -impl PrefixMapping { - fn new(slot_clock: TSlotClock, spec: ChainSpec) -> Self { - Self { - slot_clock, - chain_spec: spec, - mappings: HashMap::new(), - } - } - - /// Returns `NodeId` with the prefix that should be subscribed to the given `SubnetId`. - fn get(&mut self, subnet_id: &SubnetId) -> Result { - let current_epoch = self - .slot_clock - .now() - .ok_or("Failed to get the current epoch from clock")? - .epoch(TSpec::slots_per_epoch()); - - let select_node_id = - |mapping: &HashMap>| -> Result { - // TODO: randomize node_id that to be returned. - Ok(mapping - .get(subnet_id) - .map(|node_ids| node_ids[0].clone()) - .ok_or("No NodeId in the prefix mapping.")?) - }; - - let (node_id, vacant) = match self.mappings.entry(current_epoch) { - Entry::Occupied(entry) => { - let mapping = entry.get(); - (select_node_id(mapping)?, false) - } - Entry::Vacant(entry) => { - // compute prefixes - let computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( - current_epoch, - &self.chain_spec, - )?; - let mapping = entry.insert(computed_mapping); - (select_node_id(mapping)?, true) - } - }; - - // Remove expired mappings - if vacant { - self.mappings.retain(|epoch, _| epoch >= ¤t_epoch); - } - - Ok(node_id) - } -} - /// The main discovery service. This can be disabled via CLI arguements. When disabled the /// underlying processes are not started, but this struct still maintains our current ENR. -pub struct Discovery { +pub struct Discovery { /// A collection of seen live ENRs for quick lookup and to map peer-id's to ENRs. cached_enrs: LruCache, @@ -259,20 +199,15 @@ pub struct Discovery { /// Logger for the discovery behaviour. log: slog::Logger, - - /// Mappings of `SubnetId` to `NodeId`s for subnet discovery. - prefix_mapping: PrefixMapping, } -impl Discovery { +impl Discovery { /// NOTE: Creating discovery requires running within a tokio execution environment. pub async fn new( local_key: Keypair, config: &NetworkConfig, network_globals: Arc>, log: &slog::Logger, - chain_spec: ChainSpec, - slot_clock: TSlotClock, ) -> error::Result { let log = log.clone(); @@ -397,7 +332,6 @@ impl Discovery { update_ports, log, enr_dir, - prefix_mapping: PrefixMapping::new(slot_clock, chain_spec), }) } @@ -432,7 +366,7 @@ impl Discovery { } /// Processes a request to search for more peers on a subnet. - pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { + pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { // If the discv5 service isn't running, ignore queries if !self.started { return; @@ -854,18 +788,7 @@ impl Discovery { for subnet_query in subnet_queries { // Target node let target_node = match &subnet_query.subnet { - Subnet::Attestation(subnet_id) => { - match self.prefix_mapping.get::(subnet_id) { - Ok(raw) => { - let raw_node_id: [u8; 32] = raw.into(); - NodeId::from(raw_node_id) - } - Err(e) => { - warn!(self.log, "Failed to get target NodeId"; "error" => %e, "subnet_id" => ?subnet_id); - continue; - } - } - } + Subnet::Attestation(_subnet_id) => NodeId::random(), Subnet::SyncCommittee(_) => NodeId::random(), }; // Build the future @@ -1041,9 +964,7 @@ impl Discovery { /* NetworkBehaviour Implementation */ -impl NetworkBehaviour - for Discovery -{ +impl NetworkBehaviour for Discovery { // Discovery is not a real NetworkBehaviour... type ConnectionHandler = ConnectionHandler; type ToSwarm = DiscoveredPeers; @@ -1254,7 +1175,7 @@ impl NetworkBehaviour } } -impl Discovery { +impl Discovery { fn on_dial_failure(&mut self, peer_id: Option, error: &DialError) { if let Some(peer_id) = peer_id { match error { diff --git a/beacon_node/lighthouse_network/src/service/behaviour.rs b/beacon_node/lighthouse_network/src/service/behaviour.rs index de9116a0414..8dd750429c3 100644 --- a/beacon_node/lighthouse_network/src/service/behaviour.rs +++ b/beacon_node/lighthouse_network/src/service/behaviour.rs @@ -6,7 +6,6 @@ use crate::types::SnappyTransform; use libp2p::gossipsub; use libp2p::identify; use libp2p::swarm::NetworkBehaviour; -use slot_clock::SlotClock; use types::EthSpec; use super::api_types::RequestId; @@ -16,11 +15,10 @@ pub type SubscriptionFilter = pub type Gossipsub = gossipsub::Behaviour; #[derive(NetworkBehaviour)] -pub(crate) struct Behaviour +pub(crate) struct Behaviour where AppReqId: ReqId, TSpec: EthSpec, - TSlotClock: SlotClock, { /// Keep track of active and pending connections to enforce hard limits. pub connection_limits: libp2p::connection_limits::Behaviour, @@ -29,7 +27,7 @@ where /// The Eth2 RPC specified in the wire-0 protocol. pub eth2_rpc: RPC, TSpec>, /// Discv5 Discovery protocol. - pub discovery: Discovery, + pub discovery: Discovery, /// Keep regular connection to peers and disconnect if absent. // NOTE: The id protocol is used for initial interop. This will be removed by mainnet. /// Provides IP addresses and peer information. diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index dfdc060d969..8edcc744c1d 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -15,9 +15,9 @@ use crate::rpc::*; use crate::service::behaviour::BehaviourEvent; pub use crate::service::behaviour::Gossipsub; use crate::types::{ - fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, - SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, - CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, + fork_core_topics, subnet_from_topic_hash, DiscoveryTarget, GossipEncoding, GossipKind, + GossipTopic, SnappyTransform, Subnet, TargetedSubnetDiscovery, ALTAIR_CORE_TOPICS, + BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS, }; use crate::EnrExt; use crate::Eth2Enr; @@ -34,7 +34,6 @@ use libp2p::swarm::{Swarm, SwarmEvent}; use libp2p::PeerId; use libp2p::{identify, SwarmBuilder}; use slog::{crit, debug, info, o, trace, warn}; -use slot_clock::SlotClock; use std::path::PathBuf; use std::pin::Pin; use std::{ @@ -53,6 +52,7 @@ mod behaviour; mod gossip_cache; pub mod gossipsub_scoring_parameters; pub mod utils; + /// The number of peers we target per subnet for discovery queries. pub const TARGET_SUBNET_PEERS: usize = 6; @@ -111,8 +111,8 @@ pub enum NetworkEvent { /// Builds the network behaviour that manages the core protocols of eth2. /// This core behaviour is managed by `Behaviour` which adds peer management to all core /// behaviours. -pub struct Network { - swarm: libp2p::swarm::Swarm>, +pub struct Network { + swarm: libp2p::swarm::Swarm>, /* Auxiliary Fields */ /// A collections of variables accessible outside the network service. network_globals: Arc>, @@ -137,12 +137,11 @@ pub struct Network Network { +impl Network { pub async fn new( executor: task_executor::TaskExecutor, ctx: ServiceContext<'_>, log: &slog::Logger, - slot_clock: TSlotClock, ) -> error::Result<(Self, Arc>)> { let log = log.new(o!("service"=> "libp2p")); let mut config = ctx.config.clone(); @@ -302,8 +301,6 @@ impl Network Network &mut Discovery { + pub fn discovery_mut(&mut self) -> &mut Discovery { &mut self.swarm.behaviour_mut().discovery } /// Provides IP addresses and peer information. @@ -568,7 +565,7 @@ impl Network &Discovery { + pub fn discovery(&self) -> &Discovery { &self.swarm.behaviour().discovery } /// Provides IP addresses and peer information. @@ -916,13 +913,13 @@ impl Network) { + pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec) { // If discovery is not started or disabled, ignore the request if !self.discovery().started { return; } - let filtered: Vec = subnets_to_discover + let filtered: Vec = subnets_to_discover .into_iter() .filter(|s| { // Extend min_ttl of connected peers on required subnets @@ -1497,7 +1494,15 @@ impl Network { // Peer manager has requested a subnet discovery query for more peers. - self.discover_subnet_peers(subnets_to_discover); + let subnet_discoveries = subnets_to_discover + .into_iter() + .map(|s| TargetedSubnetDiscovery { + subnet: s.subnet, + min_ttl: s.min_ttl, + target: DiscoveryTarget::Random, + }) + .collect::>(); + self.discover_subnet_peers(subnet_discoveries); None } PeerManagerEvent::Ping(peer_id) => { diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index af9e9ef45d5..d08d73e8fb8 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -14,7 +14,7 @@ pub type Enr = discv5::enr::Enr; pub use globals::NetworkGlobals; pub use pubsub::{PubsubMessage, SnappyTransform}; -pub use subnet::{Subnet, SubnetDiscovery}; +pub use subnet::{DiscoveryTarget, Subnet, SubnetDiscovery, TargetedSubnetDiscovery}; pub use sync_state::{BackFillState, SyncState}; pub use topics::{ core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index 50d28542bec..1942decfa07 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -1,3 +1,4 @@ +use discv5::enr::NodeId; use serde::Serialize; use std::time::Instant; use types::{SubnetId, SyncSubnetId}; @@ -26,3 +27,14 @@ impl PartialEq for SubnetDiscovery { self.subnet.eq(&other.subnet) } } + +pub struct TargetedSubnetDiscovery { + pub subnet: Subnet, + pub min_ttl: Option, + pub target: DiscoveryTarget, +} + +pub enum DiscoveryTarget { + Random, + Prefix(Vec), +} diff --git a/beacon_node/network/src/lib.rs b/beacon_node/network/src/lib.rs index da64368b16d..dfb8f5c9545 100644 --- a/beacon_node/network/src/lib.rs +++ b/beacon_node/network/src/lib.rs @@ -11,6 +11,7 @@ mod metrics; mod nat; mod network_beacon_processor; mod persisted_dht; +mod prefix_mapping; mod router; mod status; mod subnet_service; diff --git a/beacon_node/network/src/prefix_mapping.rs b/beacon_node/network/src/prefix_mapping.rs new file mode 100644 index 00000000000..f80e38ef40d --- /dev/null +++ b/beacon_node/network/src/prefix_mapping.rs @@ -0,0 +1,71 @@ +use ethereum_types::U256; +use lighthouse_network::discv5::enr::NodeId; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use types::{ChainSpec, Epoch, EthSpec, SubnetId}; + +/// Stores mappings of `SubnetId` to `NodeId`s. +pub(crate) struct PrefixMapping { + chain_spec: ChainSpec, + mappings: HashMap>>, +} + +impl PrefixMapping { + pub fn new(spec: ChainSpec) -> Self { + Self { + chain_spec: spec, + mappings: HashMap::new(), + } + } + + /// Returns `NodeId` with the prefix that should be subscribed to the given `SubnetId`. + pub fn get( + &mut self, + subnet_id: &SubnetId, + current_epoch: Epoch, + ) -> Result, &'static str> { + let (node_ids, vacant) = match self.mappings.entry(current_epoch) { + Entry::Occupied(entry) => { + let mapping = entry.get(); + let node_ids = mapping + .get(subnet_id) + .ok_or("No NodeId in the prefix mapping.")? + .clone(); + (node_ids, false) + } + Entry::Vacant(entry) => { + // compute prefixes + let mut computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( + current_epoch, + &self.chain_spec, + )?; + // convert `U256`s to `NodeId`s + let mut mapping = HashMap::new(); + for (subnet_id, ids) in computed_mapping { + mapping.insert( + subnet_id, + ids.into_iter() + .map(|id| { + let raw_node_id: [u8; 32] = id.into(); + NodeId::from(raw_node_id) + }) + .collect::>(), + ) + } + let mapping = entry.insert(mapping); + let node_ids = mapping + .get(subnet_id) + .ok_or("No NodeId in the prefix mapping.")? + .clone(); + (node_ids, true) + } + }; + + // Remove expired mappings + if vacant { + self.mappings.retain(|epoch, _| epoch >= ¤t_epoch); + } + + Ok(node_ids) + } +} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 7aa1f58bf65..bd02ef11b60 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -2,6 +2,7 @@ use super::sync::manager::RequestId as SyncId; use crate::nat::EstablishedUPnPMappings; use crate::network_beacon_processor::InvalidBlockStorage; use crate::persisted_dht::{clear_dht, load_dht, persist_dht}; +use crate::prefix_mapping::PrefixMapping; use crate::router::{Router, RouterMessage}; use crate::subnet_service::SyncCommitteeService; use crate::{error, metrics}; @@ -15,8 +16,9 @@ use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; use futures::StreamExt; +use lighthouse_network::discv5::enr::NodeId; use lighthouse_network::service::Network; -use lighthouse_network::types::GossipKind; +use lighthouse_network::types::{DiscoveryTarget, GossipKind, TargetedSubnetDiscovery}; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RPCResponseErrorCode}, @@ -27,6 +29,7 @@ use lighthouse_network::{ MessageId, NetworkEvent, NetworkGlobals, PeerId, }; use slog::{crit, debug, error, info, o, trace, warn}; +use slot_clock::SlotClock; use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; use store::HotColdDB; use strum::IntoStaticStr; @@ -172,7 +175,7 @@ pub struct NetworkService { /// A reference to the underlying beacon chain. beacon_chain: Arc>, /// The underlying libp2p service that drives all the network interactions. - libp2p: Network, + libp2p: Network, /// An attestation and subnet manager service. attestation_service: AttestationService, /// A sync committeee subnet manager service. @@ -211,6 +214,8 @@ pub struct NetworkService { enable_light_client_server: bool, /// The logger for the network service. fork_context: Arc, + /// Mappings of `SubnetId` to `NodeId`s for subnet discovery. + prefix_mapping: PrefixMapping, log: slog::Logger, } @@ -289,13 +294,8 @@ impl NetworkService { }; // launch libp2p service - let (mut libp2p, network_globals) = Network::new( - executor.clone(), - service_context, - &network_log, - beacon_chain.slot_clock.clone(), - ) - .await?; + let (mut libp2p, network_globals) = + Network::new(executor.clone(), service_context, &network_log).await?; // Repopulate the DHT with stored ENR's if discovery is not disabled. if !config.disable_discovery { @@ -353,6 +353,7 @@ impl NetworkService { // create the network service and spawn the task let network_log = network_log.new(o!("service" => "network")); + let prefix_mapping = PrefixMapping::new(beacon_chain.spec.clone()); let network_service = NetworkService { beacon_chain, libp2p, @@ -373,6 +374,7 @@ impl NetworkService { metrics_update, gossipsub_parameter_update, fork_context, + prefix_mapping, log: network_log, enable_light_client_server: config.enable_light_client_server, }; @@ -858,6 +860,9 @@ impl NetworkService { } fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) { + // TODO: add cli flag + let prefix_search_for_subnet = true; + match msg { SubnetServiceMessage::Subscribe(subnet) => { for fork_digest in self.required_gossip_fork_digests() { @@ -880,7 +885,46 @@ impl NetworkService { self.libp2p.update_enr_subnet(subnet, false); } SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => { - self.libp2p.discover_subnet_peers(subnets_to_discover); + let current_epoch = match self.beacon_chain.slot_clock.now() { + Some(slot) => slot.epoch(T::EthSpec::slots_per_epoch()), + None => { + warn!(self.log, "Failed to read slot clock"); + return; + } + }; + let subnet_discoveries = subnets_to_discover + .into_iter() + .map(|s| { + let target = if prefix_search_for_subnet { + let subnet_id = match &s.subnet { + Subnet::Attestation(subnet_id) => subnet_id, + Subnet::SyncCommittee(_) => unreachable!("sync committee subnet should not be here"), + }; + match self.prefix_mapping.get::(subnet_id, current_epoch) { + Ok(node_ids) => { + // TODO: randomize the order + DiscoveryTarget::Prefix(node_ids) + } + Err(e) => { + warn!( + self.log, + "Failed to get target NodeIds for prefix search. Falling back to random id."; "error" => %e, "subnet_id" => ?subnet_id, "current_epoch" => %current_epoch + ); + DiscoveryTarget::Random + } + } + } else { + DiscoveryTarget::Random + }; + + TargetedSubnetDiscovery { + subnet: s.subnet, + min_ttl: s.min_ttl, + target, + } + }) + .collect::>(); + self.libp2p.discover_subnet_peers(subnet_discoveries); } } } @@ -908,7 +952,15 @@ impl NetworkService { self.libp2p.update_enr_subnet(subnet, false); } SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => { - self.libp2p.discover_subnet_peers(subnets_to_discover); + let subnet_discoveries = subnets_to_discover + .into_iter() + .map(|s| TargetedSubnetDiscovery { + subnet: s.subnet, + min_ttl: s.min_ttl, + target: DiscoveryTarget::Random, + }) + .collect::>(); + self.libp2p.discover_subnet_peers(subnet_discoveries); } } } From 833271f2875416a50546fed5600c5088f78b83b7 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 22 Dec 2023 11:32:46 +0900 Subject: [PATCH 05/18] Add target field to SubnetQuery --- .../lighthouse_network/src/discovery/mod.rs | 37 ++++++++++++++++--- .../lighthouse_network/src/types/subnet.rs | 1 + 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 0a98eb9d6f7..7999130d947 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -48,7 +48,7 @@ use tokio::sync::mpsc; use types::{EnrForkId, EthSpec}; mod subnet_predicate; -use crate::types::TargetedSubnetDiscovery; +use crate::types::{DiscoveryTarget, TargetedSubnetDiscovery}; pub use subnet_predicate::subnet_predicate; /// Local ENR storage filename. @@ -97,6 +97,7 @@ struct SubnetQuery { subnet: Subnet, min_ttl: Option, retries: usize, + target: DiscoveryTarget, } impl SubnetQuery { @@ -377,7 +378,7 @@ impl Discovery { "subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::>() ); for subnet in subnets_to_discover { - self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0); + self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0, subnet.target); } } @@ -604,7 +605,13 @@ impl Discovery { /// Adds a subnet query if one doesn't exist. If a subnet query already exists, this /// updates the min_ttl field. - fn add_subnet_query(&mut self, subnet: Subnet, min_ttl: Option, retries: usize) { + fn add_subnet_query( + &mut self, + subnet: Subnet, + min_ttl: Option, + retries: usize, + target: DiscoveryTarget, + ) { // remove the entry and complete the query if greater than the maximum search count if retries > MAX_DISCOVERY_RETRY { debug!( @@ -636,6 +643,7 @@ impl Discovery { subnet, min_ttl, retries, + target, }); metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64); } @@ -862,7 +870,12 @@ impl Discovery { Ok(r) if r.is_empty() => { debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for); queries.iter().for_each(|query| { - self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1); + self.add_subnet_query( + query.subnet, + query.min_ttl, + query.retries + 1, + query.target, + ); }) } Ok(r) => { @@ -889,7 +902,12 @@ impl Discovery { v.inc(); } // A subnet query has completed. Add back to the queue, incrementing retries. - self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1); + self.add_subnet_query( + query.subnet, + query.min_ttl, + query.retries + 1, + query.target, + ); // Check the specific subnet against the enr let subnet_predicate = @@ -1254,12 +1272,18 @@ mod tests { subnet_query.subnet, subnet_query.min_ttl, subnet_query.retries, + DiscoveryTarget::Random, ); assert_eq!(discovery.queued_queries.back(), Some(&subnet_query)); // New query should replace old query subnet_query.min_ttl = Some(now + Duration::from_secs(1)); - discovery.add_subnet_query(subnet_query.subnet, subnet_query.min_ttl, 1); + discovery.add_subnet_query( + subnet_query.subnet, + subnet_query.min_ttl, + 1, + DiscoveryTarget::Random, + ); subnet_query.retries += 1; @@ -1275,6 +1299,7 @@ mod tests { subnet_query.subnet, subnet_query.min_ttl, MAX_DISCOVERY_RETRY + 1, + DiscoveryTarget::Random, ); assert_eq!(discovery.queued_queries.len(), 0); diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index 1942decfa07..a3ed2fc8f56 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -34,6 +34,7 @@ pub struct TargetedSubnetDiscovery { pub target: DiscoveryTarget, } +#[derive(Copy)] pub enum DiscoveryTarget { Random, Prefix(Vec), From 00567366b35750ebfb4c54df70abe3d95297e1cd Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 22 Dec 2023 15:36:10 +0900 Subject: [PATCH 06/18] prefix search --- .../lighthouse_network/src/discovery/mod.rs | 154 ++++++++++-------- .../lighthouse_network/src/types/subnet.rs | 2 +- beacon_node/network/src/prefix_mapping.rs | 5 +- beacon_node/network/src/service.rs | 1 - 4 files changed, 87 insertions(+), 75 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 7999130d947..6e29d964885 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -130,8 +130,6 @@ impl std::fmt::Debug for SubnetQuery { enum QueryType { /// We are searching for subnet peers. Subnet(Vec), - /// We are prefix searching for subnet peers. - PrefixSearch(Vec), /// We are searching for more peers without ENR or time constraints. FindPeers, } @@ -363,7 +361,9 @@ impl Discovery { let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers); debug!(self.log, "Starting a peer discovery request"; "target_peers" => target_peers ); self.find_peer_active = true; - self.start_query(QueryType::FindPeers, target_peers, |_| true); + self.start_query(QueryType::FindPeers, NodeId::random(), target_peers, |_| { + true + }); } /// Processes a request to search for more peers on a subnet. @@ -695,8 +695,6 @@ impl Discovery { /// Runs a discovery request for a given group of subnets. fn start_subnet_query(&mut self, subnet_queries: Vec) { - let mut filtered_subnets: Vec = Vec::new(); - // find subnet queries that are still necessary let filtered_subnet_queries: Vec = subnet_queries .into_iter() @@ -724,43 +722,84 @@ impl Discovery { "connected_peers_on_subnet" => peers_on_subnet, "peers_to_find" => target_peers, ); - - filtered_subnets.push(subnet_query.subnet); true }) .collect(); // Only start a discovery query if we have a subnet to look for. if !filtered_subnet_queries.is_empty() { - // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate - let subnet_predicate = subnet_predicate::(filtered_subnets, &self.log); - - // TODO: Add CLI flag for this? - let prefix_search_for_subnet = true; - - if prefix_search_for_subnet { - debug!( - self.log, - "Starting prefix search query"; - "subnets" => ?filtered_subnet_queries, + let (random_search, prefix_search): (Vec<_>, Vec<_>) = filtered_subnet_queries + .into_iter() + .partition(|q| q.target == DiscoveryTarget::Random); + + if !random_search.is_empty() { + // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate + let subnet_predicate = subnet_predicate::( + random_search.iter().map(|q| q.subnet).collect::>(), + &self.log, ); - self.start_query( - QueryType::PrefixSearch(filtered_subnet_queries), - TARGET_PEERS_FOR_GROUPED_QUERY, - subnet_predicate, - ); - } else { debug!( self.log, "Starting grouped subnet query"; - "subnets" => ?filtered_subnet_queries, + "subnets" => ?random_search, ); self.start_query( - QueryType::Subnet(filtered_subnet_queries), + QueryType::Subnet(random_search), + NodeId::random(), TARGET_PEERS_FOR_GROUPED_QUERY, subnet_predicate, ); } + + if !prefix_search.is_empty() { + // Split the grouped subnet query into individual queries in order to prefix search. + for prefix_query in prefix_search { + // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate + let subnet_predicate = + subnet_predicate::(vec![prefix_query.subnet], &self.log); + + // Target node + let target_node = match &prefix_query.target { + DiscoveryTarget::Random => unreachable!("target should be Prefix here"), + DiscoveryTarget::Prefix(node_ids) => { + if node_ids.is_empty() { + warn!( + self.log, + "No NodeIds given for prefix search, falling back to random search."; + "subnet" => ?prefix_query, + ); + NodeId::random() + } else { + let pos = prefix_query.retries % node_ids.len(); + if let Some(id) = node_ids.get(pos) { + id.clone() + } else { + warn!( + self.log, + "NodeIds were given for prefix search, but the offset was wrong. Choosing the first one instead."; + "subnet" => ?prefix_query, + ); + node_ids + .first() + .expect("Already checked that `node_ids` is not empty.") + .clone() + } + } + } + }; + debug!( + self.log, + "Starting prefix search query"; + "subnet" => ?prefix_query, + ); + self.start_query( + QueryType::Subnet(vec![prefix_query]), + target_node, + TARGET_PEERS_FOR_GROUPED_QUERY, + subnet_predicate, + ); + } + } } } @@ -771,7 +810,8 @@ impl Discovery { /// ENR. fn start_query( &mut self, - query: QueryType, + query_type: QueryType, + target_node: NodeId, target_peers: usize, additional_predicate: impl Fn(&Enr) -> bool + Send + 'static, ) { @@ -791,47 +831,21 @@ impl Discovery { && (enr.tcp4().is_some() || enr.tcp6().is_some()) }; - if let QueryType::PrefixSearch(subnet_queries) = query { - // Split the grouped subnet query into individual queries in order to prefix search. - for subnet_query in subnet_queries { - // Target node - let target_node = match &subnet_query.subnet { - Subnet::Attestation(_subnet_id) => NodeId::random(), - Subnet::SyncCommittee(_) => NodeId::random(), - }; - // Build the future - let query_future = self - .discv5 - .find_node_predicate( - target_node, - Box::new(eth2_fork_predicate.clone()), - target_peers, - ) - .map(|v| QueryResult { - query_type: QueryType::PrefixSearch(vec![subnet_query]), - result: v, - }); + // General predicate + let predicate: Box bool + Send> = + Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); + // Build the future + let query_future = self + .discv5 + // Generate a random target node id. + .find_node_predicate(target_node, predicate, target_peers) + .map(|v| QueryResult { + query_type, + result: v, + }); - // Add the future to active queries, to be executed. - self.active_queries.push(Box::pin(query_future)); - } - } else { - // General predicate - let predicate: Box bool + Send> = - Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr)); - // Build the future - let query_future = self - .discv5 - // Generate a random target node id. - .find_node_predicate(NodeId::random(), predicate, target_peers) - .map(|v| QueryResult { - query_type: query, - result: v, - }); - - // Add the future to active queries, to be executed. - self.active_queries.push(Box::pin(query_future)); - } + // Add the future to active queries, to be executed. + self.active_queries.push(Box::pin(query_future)); } /// Process the completed QueryResult returned from discv5. @@ -863,7 +877,7 @@ impl Discovery { } } } - QueryType::Subnet(queries) | QueryType::PrefixSearch(queries) => { + QueryType::Subnet(queries) => { let subnets_searched_for: Vec = queries.iter().map(|query| query.subnet).collect(); match query.result { @@ -874,7 +888,7 @@ impl Discovery { query.subnet, query.min_ttl, query.retries + 1, - query.target, + query.target.clone(), ); }) } @@ -906,7 +920,7 @@ impl Discovery { query.subnet, query.min_ttl, query.retries + 1, - query.target, + query.target.clone(), ); // Check the specific subnet against the enr diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index a3ed2fc8f56..2921ce99fae 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -34,7 +34,7 @@ pub struct TargetedSubnetDiscovery { pub target: DiscoveryTarget, } -#[derive(Copy)] +#[derive(Clone, PartialEq)] pub enum DiscoveryTarget { Random, Prefix(Vec), diff --git a/beacon_node/network/src/prefix_mapping.rs b/beacon_node/network/src/prefix_mapping.rs index f80e38ef40d..a0d0ae59a93 100644 --- a/beacon_node/network/src/prefix_mapping.rs +++ b/beacon_node/network/src/prefix_mapping.rs @@ -1,4 +1,3 @@ -use ethereum_types::U256; use lighthouse_network::discv5::enr::NodeId; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -35,7 +34,7 @@ impl PrefixMapping { } Entry::Vacant(entry) => { // compute prefixes - let mut computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( + let computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( current_epoch, &self.chain_spec, )?; @@ -50,7 +49,7 @@ impl PrefixMapping { NodeId::from(raw_node_id) }) .collect::>(), - ) + ); } let mapping = entry.insert(mapping); let node_ids = mapping diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index bd02ef11b60..1a141fb61f3 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -16,7 +16,6 @@ use futures::channel::mpsc::Sender; use futures::future::OptionFuture; use futures::prelude::*; use futures::StreamExt; -use lighthouse_network::discv5::enr::NodeId; use lighthouse_network::service::Network; use lighthouse_network::types::{DiscoveryTarget, GossipKind, TargetedSubnetDiscovery}; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; From ba51b27807476b67a540cfb34e931a2a25540ca0 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 22 Dec 2023 15:44:27 +0900 Subject: [PATCH 07/18] Fix tests --- beacon_node/lighthouse_network/src/discovery/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 6e29d964885..d2b962d899e 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -1281,12 +1281,13 @@ mod tests { subnet: Subnet::Attestation(SubnetId::new(1)), min_ttl: Some(now), retries: 0, + target: DiscoveryTarget::Prefix(vec![NodeId::random(), NodeId::random()]), }; discovery.add_subnet_query( subnet_query.subnet, subnet_query.min_ttl, subnet_query.retries, - DiscoveryTarget::Random, + subnet_query.target.clone(), ); assert_eq!(discovery.queued_queries.back(), Some(&subnet_query)); @@ -1296,7 +1297,7 @@ mod tests { subnet_query.subnet, subnet_query.min_ttl, 1, - DiscoveryTarget::Random, + subnet_query.target.clone(), ); subnet_query.retries += 1; @@ -1313,7 +1314,7 @@ mod tests { subnet_query.subnet, subnet_query.min_ttl, MAX_DISCOVERY_RETRY + 1, - DiscoveryTarget::Random, + subnet_query.target.clone(), ); assert_eq!(discovery.queued_queries.len(), 0); @@ -1346,11 +1347,13 @@ mod tests { subnet: Subnet::Attestation(SubnetId::new(1)), min_ttl: instant1, retries: 0, + target: DiscoveryTarget::Prefix(vec![NodeId::random(), NodeId::random()]), }, SubnetQuery { subnet: Subnet::Attestation(SubnetId::new(2)), min_ttl: instant2, retries: 0, + target: DiscoveryTarget::Prefix(vec![NodeId::random(), NodeId::random()]), }, ]); From d1dd96dc6850a99530ad414f4dd165f9224bc414 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Fri, 22 Dec 2023 16:12:26 +0900 Subject: [PATCH 08/18] Shuffle node_ids --- beacon_node/network/src/service.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 1a141fb61f3..93812f738c1 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -30,6 +30,7 @@ use lighthouse_network::{ use slog::{crit, debug, error, info, o, trace, warn}; use slot_clock::SlotClock; use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; +use rand::seq::SliceRandom; use store::HotColdDB; use strum::IntoStaticStr; use task_executor::ShutdownReason; @@ -900,8 +901,8 @@ impl NetworkService { Subnet::SyncCommittee(_) => unreachable!("sync committee subnet should not be here"), }; match self.prefix_mapping.get::(subnet_id, current_epoch) { - Ok(node_ids) => { - // TODO: randomize the order + Ok(mut node_ids) => { + node_ids.shuffle(&mut rand::thread_rng()); DiscoveryTarget::Prefix(node_ids) } Err(e) => { From 7be80bda0dca8d7cf9919ebdcd2b6f89f3ba43d1 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 25 Dec 2023 16:35:21 +0900 Subject: [PATCH 09/18] Add CLI flag to enable prefix search --- beacon_node/lighthouse_network/src/config.rs | 4 ++++ beacon_node/network/src/service.rs | 8 ++++---- beacon_node/src/cli.rs | 6 ++++++ beacon_node/src/config.rs | 5 +++++ 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 47bd7b86679..abfbc24cc80 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -158,6 +158,9 @@ pub struct Config { /// Configuration for the inbound rate limiter (requests received by this node). pub inbound_rate_limiter_config: Option, + + /// Whether prefix search for attestation subnets is enabled. + pub prefix_search_for_subnet: bool, } impl Config { @@ -375,6 +378,7 @@ impl Default for Config { outbound_rate_limiter_config: None, invalid_block_storage: None, inbound_rate_limiter_config: None, + prefix_search_for_subnet: false, } } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 93812f738c1..5569cebdcd9 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -214,6 +214,8 @@ pub struct NetworkService { enable_light_client_server: bool, /// The logger for the network service. fork_context: Arc, + /// Prefix search for attestation subnets. + prefix_search_for_subnet: bool, /// Mappings of `SubnetId` to `NodeId`s for subnet discovery. prefix_mapping: PrefixMapping, log: slog::Logger, @@ -374,6 +376,7 @@ impl NetworkService { metrics_update, gossipsub_parameter_update, fork_context, + prefix_search_for_subnet: config.prefix_search_for_subnet, prefix_mapping, log: network_log, enable_light_client_server: config.enable_light_client_server, @@ -860,9 +863,6 @@ impl NetworkService { } fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) { - // TODO: add cli flag - let prefix_search_for_subnet = true; - match msg { SubnetServiceMessage::Subscribe(subnet) => { for fork_digest in self.required_gossip_fork_digests() { @@ -895,7 +895,7 @@ impl NetworkService { let subnet_discoveries = subnets_to_discover .into_iter() .map(|s| { - let target = if prefix_search_for_subnet { + let target = if self.prefix_search_for_subnet { let subnet_id = match &s.subnet { Subnet::Attestation(subnet_id) => subnet_id, Subnet::SyncCommittee(_) => unreachable!("sync committee subnet should not be here"), diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index d76f2f375f4..6f358c34731 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -1288,5 +1288,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .default_value("64") .takes_value(true) ) + .arg( + Arg::with_name("prefix-search-for-subnet") + .long("prefix-search-for-subnet") + .help("Enable prefix search for attestation subnets. Disabled by default.") + .takes_value(false), + ) .group(ArgGroup::with_name("enable_http").args(&["http", "gui", "staking"]).multiple(true)) } diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 9ceab47f336..8534c9f185a 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1395,6 +1395,11 @@ pub fn set_network_config( Some(config_str.parse()?) } }; + + if cli_args.is_present("prefix-search-for-subnet") { + config.prefix_search_for_subnet = true; + } + Ok(()) } From 8b2bdfac8a7e54b8df843b9cd265791563e6c0af Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 25 Dec 2023 17:04:01 +0900 Subject: [PATCH 10/18] Refactor PrefixMapping --- beacon_node/network/src/prefix_mapping.rs | 72 ++++++++++------------- 1 file changed, 31 insertions(+), 41 deletions(-) diff --git a/beacon_node/network/src/prefix_mapping.rs b/beacon_node/network/src/prefix_mapping.rs index a0d0ae59a93..46181ef1157 100644 --- a/beacon_node/network/src/prefix_mapping.rs +++ b/beacon_node/network/src/prefix_mapping.rs @@ -1,19 +1,20 @@ use lighthouse_network::discv5::enr::NodeId; -use std::collections::hash_map::Entry; use std::collections::HashMap; use types::{ChainSpec, Epoch, EthSpec, SubnetId}; /// Stores mappings of `SubnetId` to `NodeId`s. pub(crate) struct PrefixMapping { chain_spec: ChainSpec, - mappings: HashMap>>, + epoch: Epoch, + mapping: HashMap>, } impl PrefixMapping { pub fn new(spec: ChainSpec) -> Self { Self { chain_spec: spec, - mappings: HashMap::new(), + epoch: Epoch::new(0), + mapping: HashMap::new(), } } @@ -23,48 +24,37 @@ impl PrefixMapping { subnet_id: &SubnetId, current_epoch: Epoch, ) -> Result, &'static str> { - let (node_ids, vacant) = match self.mappings.entry(current_epoch) { - Entry::Occupied(entry) => { - let mapping = entry.get(); - let node_ids = mapping - .get(subnet_id) - .ok_or("No NodeId in the prefix mapping.")? - .clone(); - (node_ids, false) - } - Entry::Vacant(entry) => { - // compute prefixes - let computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( - current_epoch, - &self.chain_spec, - )?; - // convert `U256`s to `NodeId`s - let mut mapping = HashMap::new(); - for (subnet_id, ids) in computed_mapping { - mapping.insert( - subnet_id, - ids.into_iter() - .map(|id| { - let raw_node_id: [u8; 32] = id.into(); - NodeId::from(raw_node_id) - }) - .collect::>(), - ); - } - let mapping = entry.insert(mapping); - let node_ids = mapping - .get(subnet_id) - .ok_or("No NodeId in the prefix mapping.")? - .clone(); - (node_ids, true) + if self.epoch != current_epoch { + // compute prefixes + let computed_mapping = SubnetId::compute_prefix_mapping_for_epoch::( + current_epoch, + &self.chain_spec, + )?; + + // convert `U256`s to `NodeId`s + let mut mapping = HashMap::new(); + for (subnet_id, ids) in computed_mapping { + mapping.insert( + subnet_id, + ids.into_iter() + .map(|id| { + let raw_node_id: [u8; 32] = id.into(); + NodeId::from(raw_node_id) + }) + .collect::>(), + ); } - }; - // Remove expired mappings - if vacant { - self.mappings.retain(|epoch, _| epoch >= ¤t_epoch); + self.mapping = mapping; + self.epoch = current_epoch; } + let node_ids = self + .mapping + .get(subnet_id) + .ok_or("No NodeId in the prefix mapping.")? + .clone(); + Ok(node_ids) } } From a485e256e4a8163dc7b1ce3dac8bc6bc56dca259 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Mon, 25 Dec 2023 17:04:44 +0900 Subject: [PATCH 11/18] cargo fmt --- beacon_node/network/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 5569cebdcd9..120865d726d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -27,10 +27,10 @@ use lighthouse_network::{ types::{core_topics_to_subscribe, GossipEncoding, GossipTopic}, MessageId, NetworkEvent, NetworkGlobals, PeerId, }; +use rand::seq::SliceRandom; use slog::{crit, debug, error, info, o, trace, warn}; use slot_clock::SlotClock; use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; -use rand::seq::SliceRandom; use store::HotColdDB; use strum::IntoStaticStr; use task_executor::ShutdownReason; From 7b85df9d5ac8154341e98ea4365b581218f741c5 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 26 Dec 2023 15:51:39 +0900 Subject: [PATCH 12/18] Add test for PrefixMapping --- beacon_node/network/src/prefix_mapping.rs | 45 ++++++++++++++++------- beacon_node/network/src/service.rs | 2 +- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/beacon_node/network/src/prefix_mapping.rs b/beacon_node/network/src/prefix_mapping.rs index 46181ef1157..f4956949996 100644 --- a/beacon_node/network/src/prefix_mapping.rs +++ b/beacon_node/network/src/prefix_mapping.rs @@ -1,8 +1,8 @@ use lighthouse_network::discv5::enr::NodeId; use std::collections::HashMap; -use types::{ChainSpec, Epoch, EthSpec, SubnetId}; +use types::{ChainSpec, Epoch, EthSpec, MainnetEthSpec, SubnetId}; -/// Stores mappings of `SubnetId` to `NodeId`s. +/// Stores mappings of `SubnetId` to `NodeId`s for prefix search. pub(crate) struct PrefixMapping { chain_spec: ChainSpec, epoch: Epoch, @@ -18,8 +18,8 @@ impl PrefixMapping { } } - /// Returns `NodeId` with the prefix that should be subscribed to the given `SubnetId`. - pub fn get( + /// Returns `NodeId`s with the prefix that should be subscribed to the given `SubnetId` and `Epoch`. + pub fn get_target_nodes( &mut self, subnet_id: &SubnetId, current_epoch: Epoch, @@ -31,21 +31,22 @@ impl PrefixMapping { &self.chain_spec, )?; - // convert `U256`s to `NodeId`s - let mut mapping = HashMap::new(); - for (subnet_id, ids) in computed_mapping { - mapping.insert( - subnet_id, - ids.into_iter() + self.mapping = computed_mapping + .into_iter() + .map(|(subnet_id, ids)| { + // convert `U256`s to `NodeId`s + let node_ids = ids + .into_iter() .map(|id| { let raw_node_id: [u8; 32] = id.into(); NodeId::from(raw_node_id) }) - .collect::>(), - ); - } + .collect::>(); + + (subnet_id, node_ids) + }) + .collect(); - self.mapping = mapping; self.epoch = current_epoch; } @@ -58,3 +59,19 @@ impl PrefixMapping { Ok(node_ids) } } + +#[test] +fn test_get_target_nodes() { + let mut prefix_mapping = PrefixMapping::new(ChainSpec::mainnet()); + assert_eq!(prefix_mapping.epoch, Epoch::new(0)); + assert_eq!(prefix_mapping.mapping.len(), 0); + + let current_epoch = Epoch::new(54321); + let node_ids = prefix_mapping + .get_target_nodes::(&SubnetId::new(1), current_epoch) + .unwrap(); + assert!(!node_ids.is_empty()); + + assert_eq!(prefix_mapping.epoch, current_epoch); + assert_ne!(prefix_mapping.mapping.len(), 0); +} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 120865d726d..0b0ca43d99a 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -900,7 +900,7 @@ impl NetworkService { Subnet::Attestation(subnet_id) => subnet_id, Subnet::SyncCommittee(_) => unreachable!("sync committee subnet should not be here"), }; - match self.prefix_mapping.get::(subnet_id, current_epoch) { + match self.prefix_mapping.get_target_nodes::(subnet_id, current_epoch) { Ok(mut node_ids) => { node_ids.shuffle(&mut rand::thread_rng()); DiscoveryTarget::Prefix(node_ids) From 5e605885f2403b091d2da9b2a1260fbd0361018c Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 26 Dec 2023 16:04:00 +0900 Subject: [PATCH 13/18] Add target field to Debug impl --- beacon_node/lighthouse_network/src/discovery/mod.rs | 1 + beacon_node/lighthouse_network/src/types/subnet.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index d2b962d899e..98248ced8cc 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -122,6 +122,7 @@ impl std::fmt::Debug for SubnetQuery { .field("subnet", &self.subnet) .field("min_ttl_secs", &min_ttl_secs) .field("retries", &self.retries) + .field("target", &self.target) .finish() } } diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index 2921ce99fae..a811a12797d 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -34,7 +34,7 @@ pub struct TargetedSubnetDiscovery { pub target: DiscoveryTarget, } -#[derive(Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub enum DiscoveryTarget { Random, Prefix(Vec), From 6f6915c3f431350cdc94ddd2e976c70f0ba1d9c1 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 26 Dec 2023 16:08:54 +0900 Subject: [PATCH 14/18] Add test module to improve imports --- beacon_node/network/src/prefix_mapping.rs | 32 ++++++++++++++--------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/beacon_node/network/src/prefix_mapping.rs b/beacon_node/network/src/prefix_mapping.rs index f4956949996..9c72dc48838 100644 --- a/beacon_node/network/src/prefix_mapping.rs +++ b/beacon_node/network/src/prefix_mapping.rs @@ -1,6 +1,6 @@ use lighthouse_network::discv5::enr::NodeId; use std::collections::HashMap; -use types::{ChainSpec, Epoch, EthSpec, MainnetEthSpec, SubnetId}; +use types::{ChainSpec, Epoch, EthSpec, SubnetId}; /// Stores mappings of `SubnetId` to `NodeId`s for prefix search. pub(crate) struct PrefixMapping { @@ -60,18 +60,24 @@ impl PrefixMapping { } } -#[test] -fn test_get_target_nodes() { - let mut prefix_mapping = PrefixMapping::new(ChainSpec::mainnet()); - assert_eq!(prefix_mapping.epoch, Epoch::new(0)); - assert_eq!(prefix_mapping.mapping.len(), 0); +#[cfg(test)] +mod test { + use super::*; + use types::MainnetEthSpec; - let current_epoch = Epoch::new(54321); - let node_ids = prefix_mapping - .get_target_nodes::(&SubnetId::new(1), current_epoch) - .unwrap(); - assert!(!node_ids.is_empty()); + #[test] + fn test_get_target_nodes() { + let mut prefix_mapping = PrefixMapping::new(ChainSpec::mainnet()); + assert_eq!(prefix_mapping.epoch, Epoch::new(0)); + assert_eq!(prefix_mapping.mapping.len(), 0); - assert_eq!(prefix_mapping.epoch, current_epoch); - assert_ne!(prefix_mapping.mapping.len(), 0); + let current_epoch = Epoch::new(54321); + let node_ids = prefix_mapping + .get_target_nodes::(&SubnetId::new(1), current_epoch) + .unwrap(); + assert!(!node_ids.is_empty()); + + assert_eq!(prefix_mapping.epoch, current_epoch); + assert_ne!(prefix_mapping.mapping.len(), 0); + } } From f9ccc5f1cdd638059642b16c0f456ccf82caa254 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Tue, 26 Dec 2023 16:15:08 +0900 Subject: [PATCH 15/18] Tweak --- .../lighthouse_network/src/discovery/mod.rs | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 98248ced8cc..1f4874575c3 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -762,29 +762,28 @@ impl Discovery { // Target node let target_node = match &prefix_query.target { DiscoveryTarget::Random => unreachable!("target should be Prefix here"), + DiscoveryTarget::Prefix(node_ids) if node_ids.is_empty() => { + warn!( + self.log, + "No NodeIds given for prefix search, falling back to random search."; + "subnet" => ?prefix_query, + ); + NodeId::random() + } DiscoveryTarget::Prefix(node_ids) => { - if node_ids.is_empty() { + let pos = prefix_query.retries % node_ids.len(); + if let Some(id) = node_ids.get(pos) { + id.clone() + } else { warn!( self.log, - "No NodeIds given for prefix search, falling back to random search."; + "NodeIds were given for prefix search, but the offset was wrong. Choosing the first one instead."; "subnet" => ?prefix_query, ); - NodeId::random() - } else { - let pos = prefix_query.retries % node_ids.len(); - if let Some(id) = node_ids.get(pos) { - id.clone() - } else { - warn!( - self.log, - "NodeIds were given for prefix search, but the offset was wrong. Choosing the first one instead."; - "subnet" => ?prefix_query, - ); - node_ids - .first() - .expect("Already checked that `node_ids` is not empty.") - .clone() - } + node_ids + .first() + .expect("Already checked that `node_ids` is not empty.") + .clone() } } }; From 47a82f7c2f83405bc2d9a96eb1c9c47a352afcf3 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 27 Dec 2023 11:14:28 +0900 Subject: [PATCH 16/18] Fix clippy warnings --- consensus/types/src/subnet_id.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index ba534c69331..68a07914710 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -130,19 +130,19 @@ impl SubnetId { } /// Compute a mapping of `SubnetId` to `NodeId`s, which can be used for subnet discovery searches. + #[allow(clippy::arithmetic_side_effects)] pub fn compute_prefix_mapping_for_epoch( epoch: Epoch, spec: &ChainSpec, ) -> Result>, &'static str> { // simplify variable naming - let prefix_bits = spec.attestation_subnet_prefix_bits as u32; - let shuffling_prefix_bits = spec.attestation_subnet_shuffling_prefix_bits as u32; + let subnet_prefix_bits = spec.attestation_subnet_prefix_bits as u32 + + spec.attestation_subnet_shuffling_prefix_bits as u32; let mut mapping = HashMap::new(); - for i in 0..2_i32.pow(prefix_bits + shuffling_prefix_bits) { - let node_id = - ethereum_types::U256::from(i) << (256 - (prefix_bits + shuffling_prefix_bits)); + for i in 0..2_i32.pow(subnet_prefix_bits) { + let node_id = ethereum_types::U256::from(i) << (256 - subnet_prefix_bits); let (subnets, _) = Self::compute_subnets_for_epoch::(node_id, epoch, spec)?; for subnet_id in subnets { From 2984456f8a96d134f8943f444ae0ab26f5266f51 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Wed, 27 Dec 2023 11:35:47 +0900 Subject: [PATCH 17/18] Tweak comments --- beacon_node/lighthouse_network/src/discovery/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 1f4874575c3..2945b796c8f 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -734,7 +734,7 @@ impl Discovery { .partition(|q| q.target == DiscoveryTarget::Random); if !random_search.is_empty() { - // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate + // Build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate let subnet_predicate = subnet_predicate::( random_search.iter().map(|q| q.subnet).collect::>(), &self.log, @@ -755,10 +755,6 @@ impl Discovery { if !prefix_search.is_empty() { // Split the grouped subnet query into individual queries in order to prefix search. for prefix_query in prefix_search { - // build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate - let subnet_predicate = - subnet_predicate::(vec![prefix_query.subnet], &self.log); - // Target node let target_node = match &prefix_query.target { DiscoveryTarget::Random => unreachable!("target should be Prefix here"), @@ -787,6 +783,9 @@ impl Discovery { } } }; + // Build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate + let subnet_predicate = + subnet_predicate::(vec![prefix_query.subnet], &self.log); debug!( self.log, "Starting prefix search query"; From b832cf008c9dbfc96b7e71a58b8be1d58cc07b18 Mon Sep 17 00:00:00 2001 From: ackintosh Date: Thu, 4 Jan 2024 11:34:13 +0900 Subject: [PATCH 18/18] Fix clippy warnings --- beacon_node/lighthouse_network/src/discovery/mod.rs | 5 ++--- consensus/types/src/subnet_id.rs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 2945b796c8f..8477758cfcf 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -769,17 +769,16 @@ impl Discovery { DiscoveryTarget::Prefix(node_ids) => { let pos = prefix_query.retries % node_ids.len(); if let Some(id) = node_ids.get(pos) { - id.clone() + *id } else { warn!( self.log, "NodeIds were given for prefix search, but the offset was wrong. Choosing the first one instead."; "subnet" => ?prefix_query, ); - node_ids + *node_ids .first() .expect("Already checked that `node_ids` is not empty.") - .clone() } } }; diff --git a/consensus/types/src/subnet_id.rs b/consensus/types/src/subnet_id.rs index 68a07914710..905d8eb7166 100644 --- a/consensus/types/src/subnet_id.rs +++ b/consensus/types/src/subnet_id.rs @@ -282,7 +282,7 @@ mod tests { assert_eq!(mapping.len(), 64); for (_, node_ids) in mapping.into_iter() { - assert!(node_ids.len() > 0); + assert!(!node_ids.is_empty()); } } }