From 5612c5ce7e8d03afbe99a29f1cd05da569b3d7c7 Mon Sep 17 00:00:00 2001 From: jacobkaufmann Date: Wed, 13 Mar 2024 16:53:41 -0600 Subject: [PATCH 1/4] feat: add peerdas custody field to ENR --- Cargo.lock | 1 + beacon_node/lighthouse_network/Cargo.toml | 1 + .../lighthouse_network/src/discovery/enr.rs | 29 +++++++++- .../src/discovery/subnet_predicate.rs | 19 +++++-- beacon_node/network/src/service.rs | 14 ++--- consensus/types/src/data_column_subnet_id.rs | 54 ++++++++++++++----- consensus/types/src/eth_spec.rs | 10 +++- 7 files changed, 103 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78ddf185dd3..1050faf12da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5150,6 +5150,7 @@ dependencies = [ "hex", "hex_fmt", "instant", + "itertools", "lazy_static", "libp2p", "libp2p-mplex", diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 17114180729..24f01a6b8ab 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -47,6 +47,7 @@ tracing = { workspace = true } byteorder = { workspace = true } bytes = { workspace = true } either = { workspace = true } +itertools = { workspace = true } # Local dependencies futures-ticker = "0.0.3" diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index b0e0a01eecc..1303960a136 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -24,6 +24,8 @@ pub const ETH2_ENR_KEY: &str = "eth2"; pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets"; /// The ENR field specifying the sync committee subnet bitfield. pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets"; +/// The ENR field specifying the peerdas custody subnet count. +pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "custody_subnet_count"; /// Extension trait for ENR's within Eth2. pub trait Eth2Enr { @@ -37,6 +39,9 @@ pub trait Eth2Enr { &self, ) -> Result, &'static str>; + /// The peerdas custody subnet count associated with the ENR. + fn custody_subnet_count(&self) -> Result; + fn eth2(&self) -> Result; } @@ -63,6 +68,16 @@ impl Eth2Enr for Enr { .map_err(|_| "Could not decode the ENR syncnets bitfield") } + fn custody_subnet_count(&self) -> Result { + // NOTE: given that a minimum is defined, we could map a non-existent key-value to the + // minimum value. + let custody_bytes = self + .get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) + .ok_or("ENR custody subnet countn non-existent")?; + u64::from_ssz_bytes(custody_bytes) + .map_err(|_| "Could not decode the ENR custody subnet count") + } + fn eth2(&self) -> Result { let eth2_bytes = self.get(ETH2_ENR_KEY).ok_or("ENR has no eth2 field")?; @@ -225,6 +240,14 @@ pub fn build_enr( builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); + // set the "custody_subnet_count" field on our ENR + let custody_subnet_count = T::min_custody_requirement() as u64; + + builder.add_value( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &custody_subnet_count.as_ssz_bytes(), + ); + builder .build(enr_key) .map_err(|e| format!("Could not build Local ENR: {:?}", e)) @@ -248,10 +271,12 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { // take preference over disk udp port if one is not specified && (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4()) && (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6()) - // we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY key to match, - // otherwise we use a new ENR. This will likely only be true for non-validating nodes + // we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and + // PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will + // likely only be true for non-validating nodes. && local_enr.get(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get(ATTESTATION_BITFIELD_ENR_KEY) && local_enr.get(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get(SYNC_COMMITTEE_BITFIELD_ENR_KEY) + && local_enr.get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) } /// Loads enr from the given directory diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index 0b35465233a..c161abc39db 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -1,8 +1,10 @@ //! The subnet predicate used for searching for a particular subnet. use super::*; use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; +use itertools::Itertools; use slog::trace; use std::ops::Deref; +use types::DataColumnSubnetId; /// Returns the predicate for a given subnet. pub fn subnet_predicate( @@ -22,10 +24,16 @@ where }; // Pre-fork/fork-boundary enrs may not contain a syncnets field. - // Don't return early here + // Don't return early here. let sync_committee_bitfield: Result, _> = enr.sync_committee_bitfield::(); + // Pre-fork/fork-boundary enrs may not contain a peerdas custody field. + // Don't return early here. + // + // NOTE: we could map to minimum custody requirement here. + let custody_subnet_count: Result = enr.custody_subnet_count::(); + let predicate = subnets.iter().any(|subnet| match subnet { Subnet::Attestation(s) => attestation_bitfield .get(*s.deref() as usize) @@ -33,8 +41,13 @@ where Subnet::SyncCommittee(s) => sync_committee_bitfield .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), - // TODO(das) discovery to be implemented at a later phase. Initially we just use a large peer count. - Subnet::DataColumn(_) => false, + Subnet::DataColumn(s) => custody_subnet_count.map_or(false, |count| { + let mut subnets = DataColumnSubnetId::compute_custody_subnets::( + enr.node_id().raw().into(), + count, + ); + subnets.contains(s) + }), }); if !predicate { diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 226c5f7bade..47f0696a785 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -17,6 +17,7 @@ use futures::prelude::*; use futures::StreamExt; use lighthouse_network::service::Network; use lighthouse_network::types::GossipKind; +use lighthouse_network::Eth2Enr; use lighthouse_network::{prometheus_client::registry::Registry, MessageAcceptance}; use lighthouse_network::{ rpc::{GoodbyeReason, RPCResponseErrorCode}, @@ -733,12 +734,13 @@ impl NetworkService { } if !self.subscribe_all_subnets { - for column_subnet in - DataColumnSubnetId::compute_subnets_for_data_column::( - self.network_globals.local_enr().node_id().raw().into(), - &self.beacon_chain.spec, - ) - { + for column_subnet in DataColumnSubnetId::compute_custody_subnets::( + self.network_globals.local_enr().node_id().raw().into(), + self.network_globals + .local_enr() + .custody_subnet_count::<::EthSpec>() + .unwrap_or(self.beacon_chain.spec.custody_requirement), + ) { for fork_digest in self.required_gossip_fork_digests() { let gossip_kind = Subnet::DataColumn(column_subnet).into(); let topic = GossipTopic::new( diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 5a42b323895..488abdc99c3 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,8 +1,9 @@ //! Identifies each data column subnet by an integer identifier. -use crate::{ChainSpec, EthSpec}; +use crate::EthSpec; use ethereum_types::U256; use safe_arith::{ArithError, SafeArith}; use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; use std::fmt::{self, Display}; use std::ops::{Deref, DerefMut}; @@ -44,20 +45,47 @@ impl DataColumnSubnetId { } #[allow(clippy::arithmetic_side_effects)] + pub fn columns(&self) -> impl Iterator { + let subnet = self.0; + let data_column_subnet_count = T::data_column_subnet_count() as u64; + let columns_per_subnet = (T::number_of_columns() as u64) / data_column_subnet_count; + (0..columns_per_subnet).map(move |i| data_column_subnet_count * i + subnet) + } + /// Compute required subnets to subscribe to given the node id. /// TODO(das): Add epoch param - /// TODO(das): Add num of subnets (from ENR) - pub fn compute_subnets_for_data_column( + #[allow(clippy::arithmetic_side_effects)] + pub fn compute_custody_subnets( node_id: U256, - spec: &ChainSpec, + custody_subnet_count: u64, ) -> impl Iterator { - let num_of_column_subnets = T::data_column_subnet_count() as u64; - (0..spec.custody_requirement) - .map(move |i| { - let node_offset = (node_id % U256::from(num_of_column_subnets)).as_u64(); - node_offset.saturating_add(i) % num_of_column_subnets - }) - .map(DataColumnSubnetId::new) + // NOTE: we could perform check on `custody_subnet_count` here to ensure that it is a valid + // value, but here we assume it is valid. + + let mut subnets = SmallVec::<[u64; 32]>::new(); + let mut offset = 0; + while (subnets.len() as u64) < custody_subnet_count { + let offset_node_id = node_id + U256::from(offset); + let offset_node_id = offset_node_id.as_u64().to_le_bytes(); + let hash = ethereum_hashing::hash_fixed(&offset_node_id); + let subnet = + U256::from_little_endian(&hash).as_u64() % (T::data_column_subnet_count() as u64); + + if !subnets.contains(&subnet) { + subnets.push(subnet); + } + + offset += 1 + } + subnets.into_iter().map(DataColumnSubnetId::new) + } + + pub fn compute_custody_columns( + node_id: U256, + custody_subnet_count: u64, + ) -> impl Iterator { + Self::compute_custody_subnets::(node_id, custody_subnet_count) + .flat_map(|subnet| subnet.columns::()) } } @@ -155,9 +183,9 @@ mod test { let spec = ChainSpec::mainnet(); for x in 0..node_ids.len() { - let computed_subnets = DataColumnSubnetId::compute_subnets_for_data_column::< + let computed_subnets = DataColumnSubnetId::compute_custody_subnets::< crate::MainnetEthSpec, - >(node_ids[x], &spec); + >(node_ids[x], spec.custody_requirement); assert_eq!( expected_subnets[x], diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 00bf41e8a73..79c4376b5c7 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -6,7 +6,7 @@ use ssz_types::typenum::{ bit::B0, UInt, Unsigned, U0, U1024, U1048576, U1073741824, U1099511627776, U128, U131072, U16, U16777216, U2, U2048, U256, U32, U4, U4096, U512, U6, U625, U64, U65536, U8, U8192, }; -use ssz_types::typenum::{U17, U9}; +use ssz_types::typenum::{U1, U17, U9}; use std::fmt::{self, Debug}; use std::str::FromStr; @@ -115,6 +115,7 @@ pub trait EthSpec: /* * New in PeerDAS */ + type MinCustodyRequirement: Unsigned + Clone + Sync + Send + Debug + PartialEq; type DataColumnSubnetCount: Unsigned + Clone + Sync + Send + Debug + PartialEq; type DataColumnCount: Unsigned + Clone + Sync + Send + Debug + PartialEq; type MaxBytesPerColumn: Unsigned + Clone + Sync + Send + Debug + PartialEq; @@ -296,6 +297,10 @@ pub trait EthSpec: Self::DataColumnCount::to_usize() } + fn min_custody_requirement() -> usize { + Self::MinCustodyRequirement::to_usize() + } + fn data_column_subnet_count() -> usize { Self::DataColumnSubnetCount::to_usize() } @@ -353,6 +358,7 @@ impl EthSpec for MainnetEthSpec { type FieldElementsPerCell = U64; type BytesPerBlob = U131072; type KzgCommitmentInclusionProofDepth = U17; + type MinCustodyRequirement = U1; type DataColumnSubnetCount = U32; type DataColumnCount = U128; // Column samples are entire columns in 1D DAS. @@ -396,6 +402,7 @@ impl EthSpec for MinimalEthSpec { type MaxBlobCommitmentsPerBlock = U16; type KzgCommitmentInclusionProofDepth = U9; // DAS spec values copied from `MainnetEthSpec` + type MinCustodyRequirement = U1; type DataColumnSubnetCount = U32; type DataColumnCount = U128; type MaxBytesPerColumn = U65536; @@ -476,6 +483,7 @@ impl EthSpec for GnosisEthSpec { type BytesPerBlob = U131072; type KzgCommitmentInclusionProofDepth = U17; // DAS spec values copied from `MainnetEthSpec` + type MinCustodyRequirement = U1; type DataColumnSubnetCount = U32; type DataColumnCount = U128; type MaxBytesPerColumn = U65536; From f1f8645a735b93831bf843e0a83f07cd18028087 Mon Sep 17 00:00:00 2001 From: jacobkaufmann Date: Thu, 14 Mar 2024 12:28:59 -0600 Subject: [PATCH 2/4] add hash prefix step in subnet computation --- consensus/types/src/data_column_subnet_id.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 488abdc99c3..4e25be7cf91 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -67,9 +67,10 @@ impl DataColumnSubnetId { while (subnets.len() as u64) < custody_subnet_count { let offset_node_id = node_id + U256::from(offset); let offset_node_id = offset_node_id.as_u64().to_le_bytes(); - let hash = ethereum_hashing::hash_fixed(&offset_node_id); - let subnet = - U256::from_little_endian(&hash).as_u64() % (T::data_column_subnet_count() as u64); + let hash: [u8; 32] = ethereum_hashing::hash_fixed(&offset_node_id); + let hash_prefix = [hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]]; + let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); + let subnet = hash_prefix_u64 % (T::data_column_subnet_count() as u64); if !subnets.contains(&subnet) { subnets.push(subnet); From d600224f802f8ccf9fc9537ada0be195f245bee0 Mon Sep 17 00:00:00 2001 From: jacobkaufmann Date: Thu, 14 Mar 2024 13:34:25 -0600 Subject: [PATCH 3/4] refactor test and fix possible u64 overflow --- consensus/types/src/data_column_subnet_id.rs | 48 ++++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index 4e25be7cf91..f1f0843acad 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -66,9 +66,11 @@ impl DataColumnSubnetId { let mut offset = 0; while (subnets.len() as u64) < custody_subnet_count { let offset_node_id = node_id + U256::from(offset); - let offset_node_id = offset_node_id.as_u64().to_le_bytes(); + let offset_node_id = offset_node_id.low_u64().to_le_bytes(); let hash: [u8; 32] = ethereum_hashing::hash_fixed(&offset_node_id); - let hash_prefix = [hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7]]; + let hash_prefix = [ + hash[0], hash[1], hash[2], hash[3], hash[4], hash[5], hash[6], hash[7], + ]; let hash_prefix_u64 = u64::from_le_bytes(hash_prefix); let subnet = hash_prefix_u64 % (T::data_column_subnet_count() as u64); @@ -149,6 +151,7 @@ impl From for Error { mod test { use crate::data_column_subnet_id::DataColumnSubnetId; use crate::ChainSpec; + use crate::EthSpec; #[test] fn test_compute_subnets_for_data_column() { @@ -168,32 +171,29 @@ mod test { .map(|v| ethereum_types::U256::from_dec_str(v).unwrap()) .collect::>(); - let expected_subnets = vec![ - vec![0], - vec![29], - vec![28], - vec![20], - vec![30], - vec![9], - vec![18], - vec![21], - vec![23], - vec![29], - ]; - let spec = ChainSpec::mainnet(); - for x in 0..node_ids.len() { + for node_id in node_ids { let computed_subnets = DataColumnSubnetId::compute_custody_subnets::< crate::MainnetEthSpec, - >(node_ids[x], spec.custody_requirement); - - assert_eq!( - expected_subnets[x], - computed_subnets - .map(DataColumnSubnetId::into) - .collect::>() - ); + >(node_id, spec.custody_requirement); + let computed_subnets: Vec<_> = computed_subnets.collect(); + + // the number of subnets is equal to the custody requirement + assert_eq!(computed_subnets.len() as u64, spec.custody_requirement); + + let subnet_count = crate::MainnetEthSpec::data_column_subnet_count(); + let columns_per_subnet = crate::MainnetEthSpec::number_of_columns() / subnet_count; + for subnet in computed_subnets { + let columns: Vec<_> = subnet.columns::().collect(); + // the number of columns is equal to the specified number of columns per subnet + assert_eq!(columns.len(), columns_per_subnet); + + for pair in columns.windows(2) { + // each successive column index is offset by the number of subnets + assert_eq!(pair[1] - pair[0], subnet_count as u64); + } + } } } } From a1b2ce1c2e46ff233678ba90e9832566d200feb0 Mon Sep 17 00:00:00 2001 From: jacobkaufmann Date: Mon, 18 Mar 2024 21:18:43 -0600 Subject: [PATCH 4/4] default to min custody value if not present in ENR --- beacon_node/lighthouse_network/src/discovery/enr.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index 1303960a136..c95cbe8f61d 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -69,11 +69,12 @@ impl Eth2Enr for Enr { } fn custody_subnet_count(&self) -> Result { - // NOTE: given that a minimum is defined, we could map a non-existent key-value to the - // minimum value. + // NOTE: if the custody value is non-existent in the ENR, then we assume the minimum + // custody value defined in the spec. + let min_custody_bytes = TSpec::min_custody_requirement().as_ssz_bytes(); let custody_bytes = self .get(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) - .ok_or("ENR custody subnet countn non-existent")?; + .unwrap_or(&min_custody_bytes); u64::from_ssz_bytes(custody_bytes) .map_err(|_| "Could not decode the ENR custody subnet count") }