diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e0fc518d46c..ad7cb3081ea 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -273,7 +273,7 @@ pub async fn publish_block Result, &'static str>; /// The peerdas custody subnet count associated with the ENR. - fn custody_subnet_count(&self, spec: &ChainSpec) -> u64; + fn custody_subnet_count(&self, spec: &ChainSpec) -> Result; fn eth2(&self) -> Result; } @@ -64,14 +64,17 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } - /// if the custody value is non-existent in the ENR, then we assume the minimum custody value - /// defined in the spec. - fn custody_subnet_count(&self, spec: &ChainSpec) -> u64 { - self.get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) - .and_then(|r| r.ok()) - // If value supplied in ENR is invalid, fallback to `custody_requirement` - .filter(|csc| csc <= &spec.data_column_sidecar_subnet_count) - .unwrap_or(spec.custody_requirement) + fn custody_subnet_count(&self, spec: &ChainSpec) -> Result { + let csc = self + .get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) + .ok_or("ENR custody subnet count non-existent")? + .map_err(|_| "Could not decode the ENR custody subnet count")?; + + if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count { + Ok(csc) + } else { + Err("Invalid custody subnet count in ENR") + } } fn eth2(&self) -> Result { @@ -335,7 +338,7 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_subnet_count::(&spec), + enr.custody_subnet_count::(&spec).unwrap(), spec.custody_requirement, ); } @@ -350,31 +353,11 @@ mod test { let enr = build_enr_with_config(config, &spec).0; assert_eq!( - enr.custody_subnet_count::(&spec), + enr.custody_subnet_count::(&spec).unwrap(), spec.data_column_sidecar_subnet_count, ); } - #[test] - fn custody_subnet_count_fallback_default() { - let config = NetworkConfig::default(); - let spec = make_eip7594_spec(); - let (mut enr, enr_key) = build_enr_with_config(config, &spec); - let invalid_subnet_count = 999u64; - - enr.insert( - PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, - &invalid_subnet_count, - &enr_key, - ) - .unwrap(); - - assert_eq!( - enr.custody_subnet_count::(&spec), - spec.custody_requirement, - ); - } - fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) { let keypair = libp2p::identity::secp256k1::Keypair::generate(); let enr_key = CombinedKey::from_secp256k1(&keypair); diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index e198b3ee17f..02ff0cc3ca4 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -16,7 +16,6 @@ where E: EthSpec, { let log_clone = log.clone(); - let spec_clone = spec.clone(); move |enr: &Enr| { let attestation_bitfield: EnrAttestationBitfield = match enr.attestation_bitfield::() @@ -30,8 +29,6 @@ where let sync_committee_bitfield: Result, _> = enr.sync_committee_bitfield::(); - let custody_subnet_count = enr.custody_subnet_count::(&spec_clone); - let predicate = subnets.iter().any(|subnet| match subnet { Subnet::Attestation(s) => attestation_bitfield .get(*s.deref() as usize) @@ -40,12 +37,16 @@ where .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), Subnet::DataColumn(s) => { - let mut subnets = DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &spec, - ); - subnets.contains(s) + if let Ok(custody_subnet_count) = enr.custody_subnet_count::(&spec) { + DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw(), + custody_subnet_count, + &spec, + ) + .map_or(false, |mut subnets| subnets.contains(s)) + } else { + false + } } }); diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index 85da8dc2112..c3f64a5a1f4 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -91,6 +91,15 @@ pub static PEERS_PER_CLIENT: LazyLock> = LazyLock::new(|| { &["Client"], ) }); + +pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock> = LazyLock::new(|| { + try_create_int_gauge_vec( + "peers_per_custody_subnet_count", + "The current count of peers by custody subnet count", + &["custody_subnet_count"], + ) +}); + pub static FAILED_ATTESTATION_PUBLISHES_PER_SUBNET: LazyLock> = LazyLock::new(|| { try_create_int_gauge_vec( diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 31ff8bdfc23..4d913312354 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -19,7 +19,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use types::{EthSpec, SyncSubnetId}; +use types::{DataColumnSubnetId, EthSpec, SyncSubnetId}; pub use libp2p::core::Multiaddr; pub use libp2p::identity::Keypair; @@ -33,7 +33,7 @@ pub use peerdb::peer_info::{ }; use peerdb::score::{PeerAction, ReportSource}; pub use peerdb::sync_status::{SyncInfo, SyncStatus}; -use std::collections::{hash_map::Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap, HashSet}; use std::net::IpAddr; use strum::IntoEnumIterator; @@ -701,6 +701,8 @@ impl PeerManager { /// Received a metadata response from a peer. pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData) { + let mut invalid_meta_data = false; + if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) { if let Some(known_meta_data) = &peer_info.meta_data() { if *known_meta_data.seq_number() < *meta_data.seq_number() { @@ -717,12 +719,39 @@ impl PeerManager { debug!(self.log, "Obtained peer's metadata"; "peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number()); } - let node_id_opt = peer_id_to_node_id(peer_id).ok(); - peer_info.set_meta_data(meta_data, node_id_opt, &self.network_globals.spec); + + let custody_subnet_count_opt = meta_data.custody_subnet_count().copied().ok(); + peer_info.set_meta_data(meta_data); + + if self.network_globals.spec.is_peer_das_scheduled() { + // Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to + // prioritize PeerDAS peers. + if let Some(custody_subnet_count) = custody_subnet_count_opt { + match self.compute_peer_custody_subnets(peer_id, custody_subnet_count) { + Ok(custody_subnets) => { + peer_info.set_custody_subnets(custody_subnets); + } + Err(err) => { + debug!(self.log, "Unable to compute peer custody subnets from metadata"; + "info" => "Sending goodbye to peer", + "peer_id" => %peer_id, + "custody_subnet_count" => custody_subnet_count, + "error" => ?err, + ); + invalid_meta_data = true; + } + }; + } + } } else { error!(self.log, "Received METADATA from an unknown peer"; "peer_id" => %peer_id); } + + // Disconnect peers with invalid metadata and find other peers instead. + if invalid_meta_data { + self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager) + } } /// Updates the gossipsub scores for all known peers in gossipsub. @@ -1290,6 +1319,7 @@ impl PeerManager { let mut peers_connected = 0; let mut clients_per_peer = HashMap::new(); let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new(); + let mut peers_per_custody_subnet_count: HashMap = HashMap::new(); for (_, peer_info) in self.network_globals.peers.read().connected_peers() { peers_connected += 1; @@ -1320,11 +1350,26 @@ impl PeerManager { *peers_connected_mutli .entry((direction, transport)) .or_default() += 1; + + if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() { + *peers_per_custody_subnet_count + .entry(meta_data.custody_subnet_count) + .or_default() += 1; + } } // PEERS_CONNECTED metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected); + // CUSTODY_SUBNET_COUNT + for (custody_subnet_count, peer_count) in peers_per_custody_subnet_count.into_iter() { + metrics::set_gauge_vec( + &metrics::PEERS_PER_CUSTODY_SUBNET_COUNT, + &[&custody_subnet_count.to_string()], + peer_count, + ) + } + // PEERS_PER_CLIENT for client_kind in ClientKind::iter() { let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0); @@ -1348,6 +1393,45 @@ impl PeerManager { } } } + + fn compute_peer_custody_subnets( + &self, + peer_id: &PeerId, + custody_subnet_count: u64, + ) -> Result, String> { + // If we don't have a node id, we cannot compute the custody duties anyway + let node_id = peer_id_to_node_id(peer_id)?; + let spec = &self.network_globals.spec; + + if !(spec.custody_requirement..=spec.data_column_sidecar_subnet_count) + .contains(&custody_subnet_count) + { + return Err("Invalid custody subnet count in metadata: out of range".to_string()); + } + + let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( + node_id.raw(), + custody_subnet_count, + spec, + ) + .map(|subnets| subnets.collect()) + .unwrap_or_else(|e| { + // This is an unreachable scenario unless there's a bug, as we've validated the csc + // just above. + error!( + self.log, + "Computing peer custody subnets failed unexpectedly"; + "info" => "Falling back to default custody requirement subnets", + "peer_id" => %peer_id, + "custody_subnet_count" => custody_subnet_count, + "error" => ?e + ); + DataColumnSubnetId::compute_custody_requirement_subnets::(node_id.raw(), spec) + .collect() + }); + + Ok(custody_subnets) + } } enum ConnectingType { @@ -1680,11 +1764,7 @@ mod tests { .write() .peer_info_mut(&peer0) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1704,11 +1784,7 @@ mod tests { .write() .peer_info_mut(&peer2) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1728,11 +1804,7 @@ mod tests { .write() .peer_info_mut(&peer4) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1806,11 +1878,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); peer_manager .network_globals .peers @@ -1934,11 +2002,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2047,11 +2111,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2217,11 +2277,7 @@ mod tests { .write() .peer_info_mut(&peer) .unwrap() - .set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + .set_meta_data(MetaData::V2(metadata)); let long_lived_subnets = peer_manager .network_globals .peers @@ -2378,11 +2434,7 @@ mod tests { let mut peer_db = peer_manager.network_globals.peers.write(); let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap(); - peer_info.set_meta_data( - MetaData::V2(metadata), - None, - &peer_manager.network_globals.spec, - ); + peer_info.set_meta_data(MetaData::V2(metadata)); peer_info.set_gossipsub_score(condition.gossipsub_score); peer_info.add_to_score(condition.score); diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 6e76fd4bb00..f6b63e6de22 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,8 +1,6 @@ use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; -use crate::{ - metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Eth2Enr, Gossipsub, PeerId, -}; +use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; @@ -47,16 +45,10 @@ pub struct PeerDB { disable_peer_scoring: bool, /// PeerDB's logger log: slog::Logger, - spec: ChainSpec, } impl PeerDB { - pub fn new( - trusted_peers: Vec, - disable_peer_scoring: bool, - log: &slog::Logger, - spec: ChainSpec, - ) -> Self { + pub fn new(trusted_peers: Vec, disable_peer_scoring: bool, log: &slog::Logger) -> Self { // Initialize the peers hashmap with trusted peers let peers = trusted_peers .into_iter() @@ -68,7 +60,6 @@ impl PeerDB { banned_peers_count: BannedPeersCount::default(), disable_peer_scoring, peers, - spec, } } @@ -726,6 +717,14 @@ impl PeerDB { }, ); + if supernode { + let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); + let all_subnets = (0..spec.data_column_sidecar_subnet_count) + .map(|csc| csc.into()) + .collect(); + peer_info.set_custody_subnets(all_subnets); + } + peer_id } @@ -791,14 +790,6 @@ impl PeerDB { ) => { // Update the ENR if one exists, and compute the custody subnets if let Some(enr) = enr { - let custody_subnet_count = enr.custody_subnet_count::(&self.spec); - let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &self.spec, - ) - .collect::>(); - info.set_custody_subnets(custody_subnets); info.set_enr(enr); } @@ -1349,8 +1340,7 @@ mod tests { fn get_db() -> PeerDB { let log = build_log(slog::Level::Debug, false); - let spec = M::default_spec(); - PeerDB::new(vec![], false, &log, spec) + PeerDB::new(vec![], false, &log) } #[test] @@ -2049,8 +2039,7 @@ mod tests { fn test_trusted_peers_score() { let trusted_peer = PeerId::random(); let log = build_log(slog::Level::Debug, false); - let spec = M::default_spec(); - let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer], false, &log, spec); + let mut pdb: PeerDB = PeerDB::new(vec![trusted_peer], false, &log); pdb.connect_ingoing(&trusted_peer, "/ip4/0.0.0.0".parse().unwrap(), None); @@ -2074,8 +2063,7 @@ mod tests { fn test_disable_peer_scoring() { let peer = PeerId::random(); let log = build_log(slog::Level::Debug, false); - let spec = M::default_spec(); - let mut pdb: PeerDB = PeerDB::new(vec![], true, &log, spec); + let mut pdb: PeerDB = PeerDB::new(vec![], true, &log); pdb.connect_ingoing(&peer, "/ip4/0.0.0.0".parse().unwrap(), None); diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 1ea3f8ed5fc..ee8c27f474c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -3,7 +3,6 @@ use super::score::{PeerAction, Score, ScoreState}; use super::sync_status::SyncStatus; use crate::discovery::Eth2Enr; use crate::{rpc::MetaData, types::Subnet}; -use discv5::enr::NodeId; use discv5::Enr; use libp2p::core::multiaddr::{Multiaddr, Protocol}; use serde::{ @@ -14,7 +13,7 @@ use std::collections::HashSet; use std::net::IpAddr; use std::time::Instant; use strum::AsRefStr; -use types::{ChainSpec, DataColumnSubnetId, EthSpec}; +use types::{DataColumnSubnetId, EthSpec}; use PeerConnectionStatus::*; /// Information about a given connected peer. @@ -358,31 +357,7 @@ impl PeerInfo { /// Sets an explicit value for the meta data. // VISIBILITY: The peer manager is able to adjust the meta_data - pub(in crate::peer_manager) fn set_meta_data( - &mut self, - meta_data: MetaData, - node_id_opt: Option, - spec: &ChainSpec, - ) { - // If we don't have a node id, we cannot compute the custody duties anyway - let Some(node_id) = node_id_opt else { - self.meta_data = Some(meta_data); - return; - }; - - // Already set by enr if custody_subnets is non empty - if self.custody_subnets.is_empty() { - if let Ok(custody_subnet_count) = meta_data.custody_subnet_count() { - let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( - node_id.raw(), - std::cmp::min(*custody_subnet_count, spec.data_column_sidecar_subnet_count), - spec, - ) - .collect::>(); - self.set_custody_subnets(custody_subnets); - } - } - + pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData) { self.meta_data = Some(meta_data); } @@ -391,7 +366,10 @@ impl PeerInfo { self.connection_status = connection_status } - pub(super) fn set_custody_subnets(&mut self, custody_subnets: HashSet) { + pub(in crate::peer_manager) fn set_custody_subnets( + &mut self, + custody_subnets: HashSet, + ) { self.custody_subnets = custody_subnets } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index c76e0a18577..ac78e2cb01e 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -2,9 +2,10 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV3}; use crate::types::{BackFillState, SyncState}; +use crate::Client; use crate::EnrExt; -use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; +use itertools::Itertools; use parking_lot::RwLock; use std::collections::HashSet; use types::{ChainSpec, ColumnIndex, DataColumnSubnetId, EthSpec}; @@ -26,6 +27,9 @@ pub struct NetworkGlobals { pub sync_state: RwLock, /// The current state of the backfill sync. pub backfill_state: RwLock, + /// The computed custody subnets and columns is stored to avoid re-computing. + pub custody_subnets: Vec, + pub custody_columns: Vec, pub spec: ChainSpec, } @@ -38,20 +42,39 @@ impl NetworkGlobals { log: &slog::Logger, spec: ChainSpec, ) -> Self { + let (custody_subnets, custody_columns) = if spec.is_peer_das_scheduled() { + let custody_subnet_count = local_metadata + .custody_subnet_count() + .copied() + .expect("custody subnet count must be set if PeerDAS is scheduled"); + let custody_subnets = DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw(), + custody_subnet_count, + &spec, + ) + .expect("custody subnet count must be valid") + .collect::>(); + let custody_columns = custody_subnets + .iter() + .flat_map(|subnet| subnet.columns::(&spec)) + .sorted() + .collect(); + (custody_subnets, custody_columns) + } else { + (vec![], vec![]) + }; + NetworkGlobals { local_enr: RwLock::new(enr.clone()), peer_id: RwLock::new(enr.peer_id()), listen_multiaddrs: RwLock::new(Vec::new()), local_metadata: RwLock::new(local_metadata), - peers: RwLock::new(PeerDB::new( - trusted_peers, - disable_peer_scoring, - log, - spec.clone(), - )), + peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), backfill_state: RwLock::new(BackFillState::NotRequired), + custody_subnets, + custody_columns, spec, } } @@ -118,29 +141,6 @@ impl NetworkGlobals { std::mem::replace(&mut *self.sync_state.write(), new_state) } - /// Compute custody data columns the node is assigned to custody. - pub fn custody_columns(&self) -> Vec { - let enr = self.local_enr(); - let custody_subnet_count = enr.custody_subnet_count::(&self.spec); - DataColumnSubnetId::compute_custody_columns::( - enr.node_id().raw(), - custody_subnet_count, - &self.spec, - ) - .collect() - } - - /// Compute custody data column subnets the node is assigned to custody. - pub fn custody_subnets(&self) -> impl Iterator { - let enr = self.local_enr(); - let custody_subnet_count = enr.custody_subnet_count::(&self.spec); - DataColumnSubnetId::compute_custody_subnets::( - enr.node_id().raw(), - custody_subnet_count, - &self.spec, - ) - } - /// Returns a connected peer that: /// 1. is connected /// 2. assigned to custody the column based on it's `custody_subnet_count` from ENR or metadata @@ -161,44 +161,70 @@ impl NetworkGlobals { trusted_peers: Vec, log: &slog::Logger, spec: ChainSpec, + ) -> NetworkGlobals { + let metadata = MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + custody_subnet_count: spec.custody_requirement, + }); + Self::new_test_globals_with_metadata(trusted_peers, metadata, log, spec) + } + + pub(crate) fn new_test_globals_with_metadata( + trusted_peers: Vec, + metadata: MetaData, + log: &slog::Logger, + spec: ChainSpec, ) -> NetworkGlobals { use crate::CombinedKeyExt; let keypair = libp2p::identity::secp256k1::Keypair::generate(); let enr_key: discv5::enr::CombinedKey = discv5::enr::CombinedKey::from_secp256k1(&keypair); let enr = discv5::enr::Enr::builder().build(&enr_key).unwrap(); - NetworkGlobals::new( - enr, - MetaData::V3(MetaDataV3 { - seq_number: 0, - attnets: Default::default(), - syncnets: Default::default(), - custody_subnet_count: spec.data_column_sidecar_subnet_count, - }), - trusted_peers, - false, - log, - spec, - ) + NetworkGlobals::new(enr, metadata, trusted_peers, false, log, spec) } } #[cfg(test)] mod test { use super::*; - use types::{EthSpec, MainnetEthSpec as E}; + use types::{Epoch, EthSpec, MainnetEthSpec as E}; + + #[test] + fn test_custody_subnets() { + let log = logging::test_logger(); + let mut spec = E::default_spec(); + spec.eip7594_fork_epoch = Some(Epoch::new(0)); + + let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2; + let metadata = get_metadata(custody_subnet_count); + + let globals = + NetworkGlobals::::new_test_globals_with_metadata(vec![], metadata, &log, spec); + assert_eq!(globals.custody_subnets.len(), custody_subnet_count as usize); + } #[test] - fn test_custody_count_default() { - let spec = E::default_spec(); + fn test_custody_columns() { let log = logging::test_logger(); - let default_custody_requirement_column_count = spec.number_of_columns as u64 - / spec.data_column_sidecar_subnet_count - * spec.custody_requirement; - let globals = NetworkGlobals::::new_test_globals(vec![], &log, spec.clone()); - let columns = globals.custody_columns(); - assert_eq!( - columns.len(), - default_custody_requirement_column_count as usize - ); + let mut spec = E::default_spec(); + spec.eip7594_fork_epoch = Some(Epoch::new(0)); + + let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2; + let custody_columns_count = spec.number_of_columns / 2; + let metadata = get_metadata(custody_subnet_count); + + let globals = + NetworkGlobals::::new_test_globals_with_metadata(vec![], metadata, &log, spec); + assert_eq!(globals.custody_columns.len(), custody_columns_count); + } + + fn get_metadata(custody_subnet_count: u64) -> MetaData { + MetaData::V3(MetaDataV3 { + seq_number: 0, + attnets: Default::default(), + syncnets: Default::default(), + custody_subnet_count, + }) } } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 5b9a3125ea5..5782fb00b6c 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -16,7 +16,6 @@ use futures::prelude::*; use futures::StreamExt; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; -use lighthouse_network::Eth2Enr; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RPCResponseErrorCode}, @@ -808,17 +807,9 @@ impl NetworkService { } } } else { - for column_subnet in DataColumnSubnetId::compute_custody_subnets::( - self.network_globals.local_enr().node_id().raw(), - self.network_globals - .local_enr() - .custody_subnet_count::<::EthSpec>( - &self.fork_context.spec, - ), - &self.fork_context.spec, - ) { + for column_subnet in &self.network_globals.custody_subnets { for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = Subnet::DataColumn(column_subnet).into(); + let gossip_kind = Subnet::DataColumn(*column_subnet).into(); let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); if self.libp2p.subscribe(topic.clone()) { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 1cf028dbcd8..b9f6d180c13 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -389,7 +389,7 @@ impl SyncNetworkContext { let (expects_custody_columns, num_of_custody_column_req) = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let custody_indexes = self.network_globals().custody_columns(); + let custody_indexes = self.network_globals().custody_columns.clone(); let mut num_of_custody_column_req = 0; for (peer_id, columns_by_range_request) in @@ -758,10 +758,11 @@ impl SyncNetworkContext { .imported_custody_column_indexes(&block_root) .unwrap_or_default(); - let custody_indexes_duty = self.network_globals().custody_columns(); - // Include only the blob indexes not yet imported (received through gossip) - let custody_indexes_to_fetch = custody_indexes_duty + let custody_indexes_to_fetch = self + .network_globals() + .custody_columns + .clone() .into_iter() .filter(|index| !custody_indexes_imported.contains(index)) .collect::>(); diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index 1756fb513da..ed5946ada72 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -1112,25 +1112,25 @@ impl SyncingChain { fn good_peers_on_custody_subnets(&self, epoch: Epoch, network: &SyncNetworkContext) -> bool { if network.chain.spec.is_peer_das_enabled_for_epoch(epoch) { // Require peers on all custody column subnets before sending batches - let peers_on_all_custody_subnets = - network - .network_globals() - .custody_subnets() - .all(|subnet_id| { - let peer_count = network - .network_globals() - .peers - .read() - .good_custody_subnet_peer(subnet_id) - .count(); - - set_int_gauge( - &PEERS_PER_COLUMN_SUBNET, - &[&subnet_id.to_string()], - peer_count as i64, - ); - peer_count > 0 - }); + let peers_on_all_custody_subnets = network + .network_globals() + .custody_subnets + .iter() + .all(|subnet_id| { + let peer_count = network + .network_globals() + .peers + .read() + .good_custody_subnet_peer(*subnet_id) + .count(); + + set_int_gauge( + &PEERS_PER_COLUMN_SUBNET, + &[&subnet_id.to_string()], + peer_count as i64, + ); + peer_count > 0 + }); peers_on_all_custody_subnets } else { true diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index df964cf8de7..df61d711c19 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -41,9 +41,10 @@ impl DataColumnSubnetId { raw_node_id: [u8; 32], custody_subnet_count: u64, spec: &ChainSpec, - ) -> impl Iterator { - // TODO(das): we could perform check on `custody_subnet_count` here to ensure that it is a valid - // value, but here we assume it is valid. + ) -> Result, Error> { + if custody_subnet_count > spec.data_column_sidecar_subnet_count { + return Err(Error::InvalidCustodySubnetCount(custody_subnet_count)); + } let mut subnets: HashSet = HashSet::new(); let mut current_id = U256::from_be_slice(&raw_node_id); @@ -66,17 +67,26 @@ impl DataColumnSubnetId { } current_id += U256::from(1u64) } - subnets.into_iter().map(DataColumnSubnetId::new) + Ok(subnets.into_iter().map(DataColumnSubnetId::new)) + } + + /// Compute the custody subnets for a given node id with the default `custody_requirement`. + /// This operation should be infallable, and empty iterator is returned if it fails unexpectedly. + pub fn compute_custody_requirement_subnets( + node_id: [u8; 32], + spec: &ChainSpec, + ) -> impl Iterator { + Self::compute_custody_subnets::(node_id, spec.custody_requirement, spec) + .expect("should compute default custody subnets") } pub fn compute_custody_columns( raw_node_id: [u8; 32], custody_subnet_count: u64, spec: &ChainSpec, - ) -> impl Iterator { + ) -> Result, Error> { Self::compute_custody_subnets::(raw_node_id, custody_subnet_count, spec) - .flat_map(|subnet| subnet.columns::(spec)) - .sorted() + .map(|subnet| subnet.flat_map(|subnet| subnet.columns::(spec)).sorted()) } } @@ -121,6 +131,7 @@ impl From<&DataColumnSubnetId> for u64 { #[derive(Debug)] pub enum Error { ArithError(ArithError), + InvalidCustodySubnetCount(u64), } impl From for Error { @@ -132,9 +143,9 @@ impl From for Error { #[cfg(test)] mod test { use crate::data_column_subnet_id::DataColumnSubnetId; - use crate::EthSpec; use crate::MainnetEthSpec; use crate::Uint256; + use crate::{EthSpec, GnosisEthSpec, MinimalEthSpec}; type E = MainnetEthSpec; @@ -163,7 +174,8 @@ mod test { node_id, custody_requirement, &spec, - ); + ) + .unwrap(); let computed_subnets: Vec<_> = computed_subnets.collect(); // the number of subnets is equal to the custody requirement @@ -183,6 +195,21 @@ mod test { } } + #[test] + fn test_compute_custody_requirement_subnets_never_panics() { + let node_id = [1u8; 32]; + test_compute_custody_requirement_subnets_with_spec::(node_id); + test_compute_custody_requirement_subnets_with_spec::(node_id); + test_compute_custody_requirement_subnets_with_spec::(node_id); + } + + fn test_compute_custody_requirement_subnets_with_spec(node_id: [u8; 32]) { + let _ = DataColumnSubnetId::compute_custody_requirement_subnets::( + node_id, + &E::default_spec(), + ); + } + #[test] fn test_columns_subnet_conversion() { let spec = E::default_spec(); diff --git a/testing/ef_tests/src/cases/get_custody_columns.rs b/testing/ef_tests/src/cases/get_custody_columns.rs index d31e72a473d..9665f877300 100644 --- a/testing/ef_tests/src/cases/get_custody_columns.rs +++ b/testing/ef_tests/src/cases/get_custody_columns.rs @@ -31,6 +31,7 @@ impl Case for GetCustodyColumns { self.custody_subnet_count, &spec, ) + .expect("should compute custody columns") .collect::>(); let expected = &self.result; if computed == *expected {