diff --git a/Cargo.lock b/Cargo.lock index 0d35dc71a70..fae1a803492 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2220,6 +2220,12 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dtoa" version = "1.0.9" @@ -3263,6 +3269,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs2" version = "0.4.3" @@ -4474,6 +4486,7 @@ dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", "hex", + "mockall", "serde", "tree_hash", ] @@ -5150,6 +5163,7 @@ dependencies = [ "libp2p-mplex", "lighthouse_metrics", "lighthouse_version", + "logging", "lru", "lru_cache", "parking_lot 0.12.1", @@ -5504,6 +5518,45 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9366861eb2a2c436c20b12c8dbec5f798cea6b47ad99216be0282942e2c81ea0" +[[package]] +name = "mockall" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.60", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "monitoring_api" version = "0.1.0" @@ -6461,6 +6514,32 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "predicates" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_reqwest_error" version = "0.1.0" @@ -8415,6 +8494,7 @@ dependencies = [ "futures", "lazy_static", "lighthouse_metrics", + "logging", "slog", "sloggers", "tokio", @@ -8452,6 +8532,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "test-test_logger" version = "0.1.0" @@ -9025,6 +9111,7 @@ dependencies = [ "maplit", "merkle_proof", "metastruct", + "mockall_double", "parking_lot 0.12.1", "paste", "rand", diff --git a/Cargo.toml b/Cargo.toml index 4d3e77f83d7..3e0631ec09e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,6 +119,8 @@ libsecp256k1 = "0.7" log = "0.4" lru = "0.12" maplit = "1" +mockall = "0.12" +mockall_double = "0.3" num_cpus = "1" parking_lot = "0.12" paste = "1" diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 2ce996c69af..e1a1828d81d 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -48,7 +48,7 @@ // returned alongside. #![allow(clippy::result_large_err)] -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob, GossipVerifiedBlobList}; use crate::block_verification_types::{ AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock, }; @@ -104,8 +104,8 @@ use tree_hash::TreeHash; use types::data_column_sidecar::DataColumnSidecarError; use types::{ BeaconBlockRef, BeaconState, BeaconStateError, BlobSidecarList, ChainSpec, CloneConfig, - DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, Hash256, - InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, + DataColumnSidecar, DataColumnSubnetId, Epoch, EthSpec, ExecutionBlockHash, FullPayload, + Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; use types::{BlobSidecar, ExecPayload}; @@ -741,43 +741,16 @@ impl IntoGossipVerifiedBlockContents for PublishBlockReq }) .transpose()?; - let gossip_verified_data_columns = gossip_verified_blobs - .as_ref() - .map(|blobs| { - // NOTE: we expect KZG to be initialized if the blobs are present - let kzg = chain - .kzg - .as_ref() - .ok_or(BlockContentsError::DataColumnError( - GossipDataColumnError::::KzgNotInitialized, - ))?; - - let blob_sidecar_list: Vec<_> = - blobs.iter().map(|blob| blob.clone_blob()).collect(); - let blob_sidecar_list = BlobSidecarList::new(blob_sidecar_list) - .map_err(DataColumnSidecarError::SszError)?; - let timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_COMPUTATION); - let sidecars = DataColumnSidecar::build_sidecars(&blob_sidecar_list, &block, kzg)?; - drop(timer); - let mut gossip_verified_data_columns = vec![]; - for sidecar in sidecars { - let subnet = DataColumnSubnetId::try_from_column_index::( - sidecar.index as usize, - ) - .map_err(|_| { - BlockContentsError::::DataColumnSidecarError( - DataColumnSidecarError::DataColumnIndexOutOfBounds, - ) - })?; - let column = GossipVerifiedDataColumn::new(sidecar, subnet.into(), chain)?; - gossip_verified_data_columns.push(column); - } - let gossip_verified_data_columns = - GossipVerifiedDataColumnList::new(gossip_verified_data_columns) - .map_err(DataColumnSidecarError::SszError)?; - Ok::<_, BlockContentsError>(gossip_verified_data_columns) - }) - .transpose()?; + let peer_das_enabled = chain + .spec + .peer_das_epoch + .map_or(false, |peer_das_epoch| block.epoch() >= peer_das_epoch); + + let gossip_verified_data_columns = if peer_das_enabled { + build_gossip_verified_data_columns(chain, &block, gossip_verified_blobs.as_ref())? + } else { + None + }; let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?; @@ -793,6 +766,49 @@ impl IntoGossipVerifiedBlockContents for PublishBlockReq } } +fn build_gossip_verified_data_columns( + chain: &BeaconChain, + block: &SignedBeaconBlock>, + gossip_verified_blobs: Option<&GossipVerifiedBlobList>, +) -> Result>, BlockContentsError> { + gossip_verified_blobs + // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. + .filter(|b| !b.is_empty()) + .map(|blobs| { + // NOTE: we expect KZG to be initialized if the blobs are present + let kzg = chain + .kzg + .as_ref() + .ok_or(BlockContentsError::DataColumnError( + GossipDataColumnError::::KzgNotInitialized, + ))?; + + let blob_sidecar_list: Vec<_> = blobs.iter().map(|blob| blob.clone_blob()).collect(); + let blob_sidecar_list = BlobSidecarList::new(blob_sidecar_list) + .map_err(DataColumnSidecarError::SszError)?; + let timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_COMPUTATION); + let sidecars = DataColumnSidecar::build_sidecars(&blob_sidecar_list, block, kzg)?; + drop(timer); + let mut gossip_verified_data_columns = vec![]; + for sidecar in sidecars { + let subnet = + DataColumnSubnetId::try_from_column_index::(sidecar.index as usize) + .map_err(|_| { + BlockContentsError::::DataColumnSidecarError( + DataColumnSidecarError::DataColumnIndexOutOfBounds, + ) + })?; + let column = GossipVerifiedDataColumn::new(sidecar, subnet.into(), chain)?; + gossip_verified_data_columns.push(column); + } + let gossip_verified_data_columns = + GossipVerifiedDataColumnList::new(gossip_verified_data_columns) + .map_err(DataColumnSidecarError::SszError)?; + Ok::<_, BlockContentsError>(gossip_verified_data_columns) + }) + .transpose() +} + /// Implemented on types that can be converted into a `ExecutionPendingBlock`. /// /// Used to allow functions to accept blocks at various stages of verification. diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 813eb64d903..f6935fc90e1 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -84,8 +84,8 @@ enum RpcBlockInner { /// This variant is used with parent lookups and by-range responses. It should have all blobs /// ordered, all block roots matching, and the correct number of blobs for this block. BlockAndBlobs(Arc>, BlobSidecarList), - /// This variant is used with parent lookups and by-range responses. It should have all data columns - /// ordered, all block roots matching, and the correct number of data columns for this block. + /// This variant is used with parent lookups and by-range responses. It should have all + /// requested data columns, all block roots matching for this block. BlockAndDataColumns(Arc>, DataColumnSidecarList), } diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 9f99cad8fb6..c5d94555096 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -104,6 +104,7 @@ pub struct BeaconChainBuilder { kzg: Option>, task_executor: Option, validator_monitor_config: Option, + import_all_data_columns: bool, } impl @@ -145,6 +146,7 @@ where kzg: None, task_executor: None, validator_monitor_config: None, + import_all_data_columns: false, } } @@ -618,6 +620,12 @@ where self } + /// Sets whether to require and import all data columns when importing block. + pub fn import_all_data_columns(mut self, import_all_data_columns: bool) -> Self { + self.import_all_data_columns = import_all_data_columns; + self + } + /// Sets the `BeaconChain` event handler backend. /// /// For example, provide `ServerSentEventHandler` as a `handler`. @@ -968,8 +976,15 @@ where validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, data_availability_checker: Arc::new( - DataAvailabilityChecker::new(slot_clock, self.kzg.clone(), store, &log, self.spec) - .map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?, + DataAvailabilityChecker::new( + slot_clock, + self.kzg.clone(), + store, + self.import_all_data_columns, + &log, + self.spec, + ) + .map_err(|e| format!("Error initializing DataAvailabilityChecker: {:?}", e))?, ), kzg: self.kzg.clone(), block_production_state: Arc::new(Mutex::new(None)), diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index d1f5cbd3b17..65533531423 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -7,7 +7,7 @@ use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache; use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slasher::test_utils::E; -use slog::{debug, error, Logger}; +use slog::{debug, error, o, Logger}; use slot_clock::SlotClock; use ssz_types::FixedVector; use std::fmt; @@ -79,10 +79,26 @@ impl DataAvailabilityChecker { slot_clock: T::SlotClock, kzg: Option>, store: BeaconStore, + import_all_data_columns: bool, log: &Logger, spec: ChainSpec, ) -> Result { - let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + let custody_subnet_count = if import_all_data_columns { + E::data_column_subnet_count() + } else { + E::min_custody_requirement() + }; + + let custody_column_count = + custody_subnet_count.saturating_mul(E::data_columns_per_subnet()); + let overflow_cache = OverflowLRUCache::new( + OVERFLOW_LRU_CAPACITY, + store, + custody_column_count, + log.new(o!("service" => "availability_cache")), + spec.clone(), + )?; + Ok(Self { availability_cache: Arc::new(overflow_cache), slot_clock, @@ -250,49 +266,52 @@ impl DataAvailabilityChecker { block: RpcBlock, ) -> Result, AvailabilityCheckError> { let (block_root, block, blobs, data_columns) = block.deconstruct(); - match (blobs, data_columns) { - (None, None) => { - if self.blobs_required_for_block(&block) { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - } else { - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blobs: None, - blobs_available_timestamp: None, - data_columns: None, - })) - } - } - (maybe_blob_list, maybe_data_column_list) => { - let (verified_blobs, verified_data_column) = - if self.blobs_required_for_block(&block) { - let kzg = self - .kzg - .as_ref() - .ok_or(AvailabilityCheckError::KzgNotInitialized)?; - - if let Some(blob_list) = maybe_blob_list.as_ref() { - verify_kzg_for_blob_list(blob_list.iter(), kzg) - .map_err(AvailabilityCheckError::Kzg)?; - } - if let Some(data_column_list) = maybe_data_column_list.as_ref() { - verify_kzg_for_data_column_list(data_column_list.iter(), kzg) - .map_err(AvailabilityCheckError::Kzg)?; - } - (maybe_blob_list, maybe_data_column_list) - } else { - (None, None) - }; + if self.blobs_required_for_block(&block) { + return if let Some(blob_list) = blobs.as_ref() { + let kzg = self + .kzg + .as_ref() + .ok_or(AvailabilityCheckError::KzgNotInitialized)?; + verify_kzg_for_blob_list(blob_list.iter(), kzg) + .map_err(AvailabilityCheckError::Kzg)?; Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs: verified_blobs, + blobs, blobs_available_timestamp: None, - data_columns: verified_data_column, + data_columns: None, })) - } + } else { + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) + }; + } + if self.data_columns_required_for_block(&block) { + return if let Some(data_column_list) = data_columns.as_ref() { + let kzg = self + .kzg + .as_ref() + .ok_or(AvailabilityCheckError::KzgNotInitialized)?; + verify_kzg_for_data_column_list(data_column_list.iter(), kzg) + .map_err(AvailabilityCheckError::Kzg)?; + Ok(MaybeAvailableBlock::Available(AvailableBlock { + block_root, + block, + blobs: None, + blobs_available_timestamp: None, + data_columns, + })) + } else { + Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) + }; } + + Ok(MaybeAvailableBlock::Available(AvailableBlock { + block_root, + block, + blobs: None, + blobs_available_timestamp: None, + data_columns: None, + })) } /// Checks if a vector of blocks are available. Returns a vector of `MaybeAvailableBlock` @@ -324,39 +343,46 @@ impl DataAvailabilityChecker { verify_kzg_for_blob_list(all_blobs.iter(), kzg)?; } + // TODO(das) verify kzg for all data columns + for block in blocks { let (block_root, block, blobs, data_columns) = block.deconstruct(); - match (blobs, data_columns) { - (None, None) => { - if self.blobs_required_for_block(&block) { - results.push(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - } else { - results.push(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blobs: None, - blobs_available_timestamp: None, - data_columns: None, - })) - } + + let maybe_available_block = if self.blobs_required_for_block(&block) { + if blobs.is_some() { + MaybeAvailableBlock::Available(AvailableBlock { + block_root, + block, + blobs, + blobs_available_timestamp: None, + data_columns: None, + }) + } else { + MaybeAvailableBlock::AvailabilityPending { block_root, block } } - (maybe_blob_list, maybe_data_column_list) => { - let (verified_blobs, verified_data_columns) = - if self.blobs_required_for_block(&block) { - (maybe_blob_list, maybe_data_column_list) - } else { - (None, None) - }; - // already verified kzg for all blobs - results.push(MaybeAvailableBlock::Available(AvailableBlock { + } else if self.data_columns_required_for_block(&block) { + if data_columns.is_some() { + MaybeAvailableBlock::Available(AvailableBlock { block_root, block, - blobs: verified_blobs, + blobs: None, blobs_available_timestamp: None, - data_columns: verified_data_columns, - })) + data_columns, + }) + } else { + MaybeAvailableBlock::AvailabilityPending { block_root, block } } - } + } else { + MaybeAvailableBlock::Available(AvailableBlock { + block_root, + block, + blobs: None, + blobs_available_timestamp: None, + data_columns: None, + }) + }; + + results.push(maybe_available_block); } Ok(results) @@ -365,7 +391,25 @@ impl DataAvailabilityChecker { /// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required. /// If the block's epoch is from prior to the data availability boundary, no blobs are required. fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { - block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch()) + block.num_expected_blobs() > 0 + && self.da_check_required_for_epoch(block.epoch()) + && !self.is_peer_das_enabled_for_epoch(block.epoch()) + } + + /// Determines the data column requirements for a block. + /// - If the block is pre-peerdas, no data columns are required. + /// - If the block's epoch is from prior to the data availability boundary, no data columns are required. + fn data_columns_required_for_block(&self, block: &SignedBeaconBlock) -> bool { + block.num_expected_blobs() > 0 + && self.da_check_required_for_epoch(block.epoch()) + && self.is_peer_das_enabled_for_epoch(block.epoch()) + } + + /// Returns true if the given epoch is greater than or equal to the `PEER_DAS_EPOCH`. + fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { + self.spec + .peer_das_epoch + .map_or(false, |peer_das_epoch| block_epoch >= peer_das_epoch) } /// The epoch at which we require a data availability check in block processing. diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 58aedc73ab1..e4843889bbf 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,6 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, + UnableToDetermineImportRequirement, Unexpected, SszTypes(ssz_types::Error), MissingBlobs, @@ -41,6 +42,7 @@ impl Error { | Error::Unexpected | Error::ParentStateMissing(_) | Error::BlockReplayError(_) + | Error::UnableToDetermineImportRequirement | Error::RebuildingStateCaches(_) | Error::SlotClockError => ErrorCategory::Internal, Error::Kzg(_) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index dc29388bf9a..8db5ebdbad7 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -39,6 +39,7 @@ use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; +use slog::{debug, trace, Logger}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, VariableList}; @@ -52,14 +53,21 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; /// /// The blobs are all gossip and kzg verified. /// The block has completed all verifications except the availability check. +/// TODO(das): this struct can potentially be reafactored as blobs and data columns are mutually +/// exclusive and this could simplify `is_importable`. #[derive(Encode, Decode, Clone)] pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, - pub verified_data_columns: FixedVector>, E::DataColumnCount>, + pub verified_data_columns: VariableList, E::DataColumnCount>, pub executed_block: Option>, } +pub enum BlockImportRequirement { + AllBlobs, + CustodyColumns(usize), +} + impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -73,10 +81,20 @@ impl PendingComponents { &self.verified_blobs } - /// Returns an immutable reference to the fixed vector of cached data columns. + /// Returns a mutable reference to the cached data column. + pub fn get_cached_data_column( + &self, + data_column_index: u64, + ) -> Option<&KzgVerifiedDataColumn> { + self.verified_data_columns + .iter() + .find(|d| d.data_column_index() == data_column_index) + } + + /// Returns an immutable reference to the list of cached data columns. pub fn get_cached_data_columns( &self, - ) -> &FixedVector>, E::DataColumnCount> { + ) -> &VariableList, E::DataColumnCount> { &self.verified_data_columns } @@ -95,7 +113,7 @@ impl PendingComponents { /// Returns a mutable reference to the fixed vector of cached data columns. pub fn get_cached_data_columns_mut( &mut self, - ) -> &mut FixedVector>, E::DataColumnCount> { + ) -> &mut VariableList, E::DataColumnCount> { &mut self.verified_data_columns } @@ -111,16 +129,15 @@ impl PendingComponents { .unwrap_or(false) } - /// Checks if a data column exists at the given index in the cache. + /// Checks if a data column of a given index exists in the cache. /// /// Returns: - /// - `true` if a data column exists at the given index. + /// - `true` if a data column for the given index exists. /// - `false` otherwise. - fn data_column_exists(&self, data_colum_index: usize) -> bool { + fn data_column_exists(&self, data_colum_index: u64) -> bool { self.get_cached_data_columns() - .get(data_colum_index) - .map(|d| d.is_some()) - .unwrap_or(false) + .iter() + .any(|d| d.data_column_index() == data_colum_index) } /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a @@ -138,6 +155,11 @@ impl PendingComponents { self.get_cached_blobs().iter().flatten().count() } + /// Returns the number of data columns that have been received and are stored in the cache. + pub fn num_received_data_columns(&self) -> usize { + self.get_cached_data_columns().iter().count() + } + /// Inserts a block into the cache. pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { *self.get_cached_block_mut() = Some(block) @@ -185,40 +207,18 @@ impl PendingComponents { } } - /// Inserts a data column at a specific index in the cache. - /// - /// Existing data column at the index will be replaced. - fn insert_data_column_at_index( - &mut self, - data_column_index: usize, - data_column: KzgVerifiedDataColumn, - ) { - if let Some(b) = self - .get_cached_data_columns_mut() - .get_mut(data_column_index) - { - *b = Some(data_column); - } - } - /// Merges a given set of data columns into the cache. - /// - /// Data columns are only inserted if: - /// 1. The data column entry at the index is empty and no block exists. - /// 2. The block exists and its commitments matches the data column's commitments. - fn merge_data_columns( + fn merge_data_columns>>( &mut self, - data_columns: FixedVector>, E::DataColumnCount>, - ) { - for (index, data_column) in data_columns.iter().cloned().enumerate() { - let Some(data_column) = data_column else { - continue; - }; + kzg_verified_data_columns: I, + ) -> Result<(), AvailabilityCheckError> { + for data_column in kzg_verified_data_columns { // TODO(das): Add equivalent checks for data columns if necessary - if !self.data_column_exists(index) { - self.insert_data_column_at_index(index, data_column) + if !self.data_column_exists(data_column.data_column_index()) { + self.verified_data_columns.push(data_column)?; } } + Ok(()) } /// Inserts a new block and revalidates the existing blobs against it. @@ -230,15 +230,48 @@ impl PendingComponents { self.merge_blobs(reinsert); } - /// Checks if the block and all of its expected blobs are available in the cache. + /// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are + /// available in the cache. /// - /// Returns `true` if both the block exists and the number of received blobs matches the number - /// of expected blobs. - pub fn is_available(&self) -> bool { - if let Some(num_expected_blobs) = self.num_expected_blobs() { - num_expected_blobs == self.num_received_blobs() - } else { - false + /// Returns `true` if both the block exists and the number of received blobs / custody columns + /// matches the number of expected blobs / custody columns. + pub fn is_available( + &self, + block_import_requirement: BlockImportRequirement, + log: &Logger, + ) -> bool { + match block_import_requirement { + BlockImportRequirement::AllBlobs => { + debug!( + log, + "Checking block and blob importability"; + "block_root" => %self.block_root, + "num_expected_blobs" => ?self.num_expected_blobs(), + "num_received_blobs" => self.num_received_blobs(), + ); + if let Some(num_expected_blobs) = self.num_expected_blobs() { + num_expected_blobs == self.num_received_blobs() + } else { + false + } + } + BlockImportRequirement::CustodyColumns(num_expected_columns) => { + let num_received_data_columns = self.num_received_data_columns(); + + trace!( + log, + "Checking block and data column importability"; + "block_root" => %self.block_root, + "num_received_data_columns" => num_received_data_columns, + ); + + if let Some(num_expected_blobs) = self.num_expected_blobs() { + // No data columns when there are 0 blobs + num_expected_blobs == 0 || num_expected_columns == num_received_data_columns + } else { + false + } + } } } @@ -247,7 +280,7 @@ impl PendingComponents { Self { block_root, verified_blobs: FixedVector::default(), - verified_data_columns: FixedVector::default(), + verified_data_columns: VariableList::default(), executed_block: None, } } @@ -295,8 +328,7 @@ impl PendingComponents { // TODO(das) Do we need a check here for number of expected custody columns? let verified_data_columns = verified_data_columns .into_iter() - .cloned() - .filter_map(|d| d.map(|d| d.to_data_column())) + .map(|d| d.to_data_column()) .collect::>() .into(); @@ -336,16 +368,15 @@ impl PendingComponents { }); } } - for maybe_data_column in self.verified_data_columns.iter() { - if maybe_data_column.is_some() { - return maybe_data_column.as_ref().map(|kzg_verified_data_column| { - kzg_verified_data_column - .as_data_column() - .slot() - .epoch(E::slots_per_epoch()) - }); - } + + if let Some(kzg_verified_data_column) = self.verified_data_columns.first() { + let epoch = kzg_verified_data_column + .as_data_column() + .slot() + .epoch(E::slots_per_epoch()); + return Some(epoch); } + None }) } @@ -434,10 +465,7 @@ impl OverflowStore { .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? } - for data_column in Vec::from(pending_components.verified_data_columns) - .into_iter() - .flatten() - { + for data_column in pending_components.verified_data_columns.into_iter() { let key = OverflowKey::from_data_column_id::(DataColumnIdentifier { block_root, index: data_column.data_column_index(), @@ -483,15 +511,13 @@ impl OverflowStore { .ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? = Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); } - OverflowKey::DataColumn(_, index) => { - *maybe_pending_components + OverflowKey::DataColumn(_, _index) => { + let data_column = + KzgVerifiedDataColumn::from_ssz_bytes(value_bytes.as_slice())?; + maybe_pending_components .get_or_insert_with(|| PendingComponents::empty(block_root)) - .verified_data_columns - .get_mut(index as usize) - .ok_or(AvailabilityCheckError::DataColumnIndexInvalid(index as u64))? = - Some(KzgVerifiedDataColumn::from_ssz_bytes( - value_bytes.as_slice(), - )?); + .get_cached_data_columns_mut() + .push(data_column)?; } } } @@ -608,12 +634,7 @@ impl Critical { ) -> Result>>, AvailabilityCheckError> { if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { Ok(pending_components - .verified_data_columns - .get(data_column_id.index as usize) - .ok_or(AvailabilityCheckError::DataColumnIndexInvalid( - data_column_id.index, - ))? - .as_ref() + .get_cached_data_column(data_column_id.index) .map(|data_column| data_column.clone_data_column())) } else { Ok(None) @@ -694,12 +715,18 @@ pub struct OverflowLRUCache { maintenance_lock: Mutex<()>, /// The capacity of the LRU cache capacity: NonZeroUsize, + /// The number of data columns the node is custodying. + custody_column_count: usize, + log: Logger, + spec: ChainSpec, } impl OverflowLRUCache { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, + custody_column_count: usize, + log: Logger, spec: ChainSpec, ) -> Result { let overflow_store = OverflowStore(beacon_store.clone()); @@ -708,9 +735,12 @@ impl OverflowLRUCache { Ok(Self { critical: RwLock::new(critical), overflow_store, - state_cache: StateLRUCache::new(beacon_store, spec), + state_cache: StateLRUCache::new(beacon_store, spec.clone()), maintenance_lock: Mutex::new(()), capacity, + custody_column_count, + log, + spec, }) } @@ -759,6 +789,27 @@ impl OverflowLRUCache { } } + fn block_import_requirement( + &self, + pending_components: &PendingComponents, + ) -> Result { + let epoch = pending_components + .epoch() + .ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?; + + let peer_das_enabled = self + .spec + .peer_das_epoch + .map_or(false, |peer_das_epoch| epoch >= peer_das_epoch); + if peer_das_enabled { + Ok(BlockImportRequirement::CustodyColumns( + self.custody_column_count, + )) + } else { + Ok(BlockImportRequirement::AllBlobs) + } + } + pub fn put_kzg_verified_data_columns< I: IntoIterator>, >( @@ -766,16 +817,6 @@ impl OverflowLRUCache { block_root: Hash256, kzg_verified_data_columns: I, ) -> Result, AvailabilityCheckError> { - let mut fixed_data_columns = FixedVector::default(); - - for data_column in kzg_verified_data_columns { - if let Some(data_column_opt) = - fixed_data_columns.get_mut(data_column.data_column_index() as usize) - { - *data_column_opt = Some(data_column); - } - } - let mut write_lock = self.critical.write(); // Grab existing entry or create a new entry. @@ -784,12 +825,23 @@ impl OverflowLRUCache { .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the data columns. - pending_components.merge_data_columns(fixed_data_columns); - - write_lock.put_pending_components(block_root, pending_components, &self.overflow_store)?; + pending_components.merge_data_columns(kzg_verified_data_columns)?; - // TODO(das): Currently this does not change availability status and nor import yet. - Ok(Availability::MissingComponents(block_root)) + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(block_import_requirement, &self.log) { + // No need to hold the write lock anymore + drop(write_lock); + pending_components.make_available(|diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) + } else { + write_lock.put_pending_components( + block_root, + pending_components, + &self.overflow_store, + )?; + Ok(Availability::MissingComponents(block_root)) + } } pub fn put_kzg_verified_blobs>>( @@ -815,7 +867,8 @@ impl OverflowLRUCache { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if pending_components.is_available() { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -854,7 +907,8 @@ impl OverflowLRUCache { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - if pending_components.is_available() { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -1163,6 +1217,7 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; + const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 4; fn get_store_with_spec( db_path: &TempDir, @@ -1383,8 +1438,14 @@ mod test { let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let cache = Arc::new( - OverflowLRUCache::::new(capacity_non_zero, test_store, spec.clone()) - .expect("should create cache"), + OverflowLRUCache::::new( + capacity_non_zero, + test_store, + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, + harness.logger().clone(), + spec.clone(), + ) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -1887,6 +1948,8 @@ mod test { let recovered_cache = OverflowLRUCache::::new( new_non_zero_usize(capacity), harness.chain.store.clone(), + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, + harness.logger().clone(), harness.chain.spec.clone(), ) .expect("should recover cache"); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index e7f201b8521..701ea689006 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -206,6 +206,7 @@ where .graffiti(graffiti) .event_handler(event_handler) .execution_layer(execution_layer) + .import_all_data_columns(config.network.subscribe_all_data_column_subnets) .validator_monitor_config(config.validator_monitor.clone()); let builder = if let Some(slasher) = self.slasher.clone() { diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index cc117c3fb92..4f5f6bd6948 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1261,12 +1261,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_contents: PublishBlockRequest, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_block( @@ -1277,6 +1279,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1292,6 +1295,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_bytes: Bytes, @@ -1299,6 +1303,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block_contents = PublishBlockRequest::::from_ssz_bytes( @@ -1316,6 +1321,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1331,6 +1337,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1338,6 +1345,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_block( @@ -1348,6 +1356,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) @@ -1364,6 +1373,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1372,6 +1382,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block_contents = PublishBlockRequest::::from_ssz_bytes( @@ -1389,6 +1400,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) @@ -1408,12 +1420,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_contents: Arc>, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_blinded_block( @@ -1423,6 +1437,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1438,12 +1453,14 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |block_bytes: Bytes, task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block = SignedBlindedBeaconBlock::::from_ssz_bytes( @@ -1461,6 +1478,7 @@ pub fn serve( log, BroadcastValidation::default(), duplicate_block_status_code, + network_globals, ) .await }) @@ -1476,6 +1494,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1483,6 +1502,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_blinded_block( @@ -1492,6 +1512,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) @@ -1507,6 +1528,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) + .and(network_globals.clone()) .and(log_filter.clone()) .then( move |validation_level: api_types::BroadcastValidationQuery, @@ -1514,6 +1536,7 @@ pub fn serve( task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, + network_globals: Arc>, log: Logger| { task_spawner.spawn_async_with_rejection(Priority::P0, async move { let block = SignedBlindedBeaconBlock::::from_ssz_bytes( @@ -1531,6 +1554,7 @@ pub fn serve( log, validation_level.broadcast_validation, duplicate_block_status_code, + network_globals, ) .await }) diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 59ab3388d8f..7155f6c4889 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -9,7 +9,7 @@ use beacon_chain::{ use eth2::types::{into_full_block_and_blobs, BroadcastValidation, ErrorMessage}; use eth2::types::{FullPayloadContents, PublishBlockRequest}; use execution_layer::ProvenancedPayload; -use lighthouse_network::PubsubMessage; +use lighthouse_network::{NetworkGlobals, PubsubMessage}; use network::NetworkMessage; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; @@ -46,6 +46,7 @@ impl> ProvenancedBloc } /// Handles a request from the HTTP API for full blocks. +#[allow(clippy::too_many_arguments)] pub async fn publish_block>( block_root: Option, provenanced_block: ProvenancedBlock, @@ -54,6 +55,7 @@ pub async fn publish_block>, ) -> Result { let seen_timestamp = timestamp_now(); @@ -238,6 +240,35 @@ pub async fn publish_block &msg + ); + Err(warp_utils::reject::custom_bad_request(msg)) + }; + } + } + } + } + match Box::pin(chain.process_block( block_root, gossip_verified_block, @@ -324,6 +355,7 @@ pub async fn publish_blinded_block( log: Logger, validation_level: BroadcastValidation, duplicate_status_code: StatusCode, + network_globals: Arc>, ) -> Result { let block_root = blinded_block.canonical_root(); let full_block: ProvenancedBlock> = @@ -336,6 +368,7 @@ pub async fn publish_blinded_block( log, validation_level, duplicate_status_code, + network_globals, ) .await } diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 6a3f7947e6b..702b636ff9c 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -370,6 +370,7 @@ pub async fn consensus_partial_pass_only_consensus() { /* submit `block_b` which should induce equivocation */ let channel = tokio::sync::mpsc::unbounded_channel(); + let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, @@ -379,6 +380,7 @@ pub async fn consensus_partial_pass_only_consensus() { test_logger, validation_level.unwrap(), StatusCode::ACCEPTED, + network_globals, ) .await; @@ -659,6 +661,7 @@ pub async fn equivocation_consensus_late_equivocation() { assert!(gossip_block_contents_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); + let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, @@ -668,6 +671,7 @@ pub async fn equivocation_consensus_late_equivocation() { test_logger, validation_level.unwrap(), StatusCode::ACCEPTED, + network_globals, ) .await; @@ -1305,6 +1309,7 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); + let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_blinded_block( block_b, @@ -1313,6 +1318,7 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { test_logger, validation_level.unwrap(), StatusCode::ACCEPTED, + network_globals, ) .await; diff --git a/beacon_node/lighthouse_network/Cargo.toml b/beacon_node/lighthouse_network/Cargo.toml index 494fd6892a9..a8e8e13668a 100644 --- a/beacon_node/lighthouse_network/Cargo.toml +++ b/beacon_node/lighthouse_network/Cargo.toml @@ -70,6 +70,7 @@ tempfile = { workspace = true } quickcheck = { workspace = true } quickcheck_macros = { workspace = true } async-channel = { workspace = true } +logging = { workspace = true } [features] libp2p-websocket = [] diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index 03f530db4d1..5189706a3bc 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -100,6 +100,9 @@ pub struct Config { /// Attempt to construct external port mappings with UPnP. pub upnp_enabled: bool, + /// Subscribe to all data column subnets for the duration of the runtime. + pub subscribe_all_data_column_subnets: bool, + /// Subscribe to all subnets for the duration of the runtime. pub subscribe_all_subnets: bool, @@ -338,6 +341,7 @@ impl Default for Config { upnp_enabled: true, network_load: 4, private: false, + subscribe_all_data_column_subnets: false, subscribe_all_subnets: false, import_all_attestations: false, shutdown_after_sync: false, diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index a2aa640eb2b..00ce790da28 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -238,7 +238,11 @@ pub fn build_enr( builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); // set the "custody_subnet_count" field on our ENR - let custody_subnet_count = E::min_custody_requirement() as u64; + let custody_subnet_count = if config.subscribe_all_data_column_subnets { + E::data_column_subnet_count() as u64 + } else { + E::min_custody_requirement() as u64 + }; builder.add_value( PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index f9ed2c9f740..ff2cb97d057 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -2,12 +2,13 @@ use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; -use crate::Client; use crate::EnrExt; +use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; use std::collections::HashSet; -use types::EthSpec; +use types::data_column_sidecar::ColumnIndex; +use types::{DataColumnSubnetId, Epoch, EthSpec}; pub struct NetworkGlobals { /// The current local ENR. @@ -110,6 +111,17 @@ impl NetworkGlobals { std::mem::replace(&mut *self.sync_state.write(), new_state) } + /// Compute custody data columns the node is assigned to custody. + pub fn custody_columns(&self, _epoch: Epoch) -> Result, &'static str> { + let enr = self.local_enr(); + let node_id = enr.node_id().raw().into(); + let custody_subnet_count = enr.custody_subnet_count::()?; + Ok( + DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count) + .collect(), + ) + } + /// TESTING ONLY. Build a dummy NetworkGlobals instance. pub fn new_test_globals(trusted_peers: Vec, log: &slog::Logger) -> NetworkGlobals { use crate::CombinedKeyExt; @@ -129,3 +141,20 @@ impl NetworkGlobals { ) } } + +#[cfg(test)] +mod test { + use crate::NetworkGlobals; + use types::{Epoch, EthSpec, MainnetEthSpec as E}; + + #[test] + fn test_custody_count_default() { + let log = logging::test_logger(); + let default_custody_requirement_column_count = + E::number_of_columns() / E::data_column_subnet_count(); + let globals = NetworkGlobals::::new_test_globals(vec![], &log); + let any_epoch = Epoch::new(0); + let columns = globals.custody_columns(any_epoch).unwrap(); + assert_eq!(columns.len(), default_custody_requirement_column_count); + } +} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 19f0fad6d95..4a1bd2c6621 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -191,6 +191,8 @@ pub struct NetworkService { next_fork_subscriptions: Pin>>, /// A delay that expires when we need to unsubscribe from old fork topics. next_unsubscribe: Pin>>, + /// Subscribe to all the data column subnets. + subscribe_all_data_column_subnets: bool, /// Subscribe to all the subnets once synced. subscribe_all_subnets: bool, /// Shutdown beacon node after sync is complete. @@ -357,6 +359,7 @@ impl NetworkService { next_fork_update, next_fork_subscriptions, next_unsubscribe, + subscribe_all_data_column_subnets: config.subscribe_all_data_column_subnets, subscribe_all_subnets: config.subscribe_all_subnets, shutdown_after_sync: config.shutdown_after_sync, metrics_enabled: config.metrics_enabled, @@ -733,7 +736,25 @@ impl NetworkService { } } - if !self.subscribe_all_subnets { + if self.subscribe_all_data_column_subnets { + for column_subnet in 0..T::EthSpec::data_column_subnet_count() as u64 { + for fork_digest in self.required_gossip_fork_digests() { + let gossip_kind = + Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into(); + let topic = GossipTopic::new( + gossip_kind, + GossipEncoding::default(), + fork_digest, + ); + if self.libp2p.subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + } else { + // TODO(das): subscribe after `PEER_DAS_EPOCH` for column_subnet in DataColumnSubnetId::compute_custody_subnets::( self.network_globals.local_enr().node_id().raw().into(), self.network_globals @@ -790,23 +811,6 @@ impl NetworkService { } } } - // Subscribe to all data column subnets - for column_subnet in 0..T::EthSpec::data_column_subnet_count() as u64 { - for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = - Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into(); - let topic = GossipTopic::new( - gossip_kind, - GossipEncoding::default(), - fork_digest, - ); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } } if !subscribed_topics.is_empty() { diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 59cb6636d57..c2094e7b930 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -652,7 +652,7 @@ mod tests { HotColdDB::open_ephemeral(StoreConfig::default(), ChainSpec::minimal(), log.clone()) .expect("store"); let da_checker = Arc::new( - DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone()) + DataAvailabilityChecker::new(slot_clock, None, store.into(), false, &log, spec.clone()) .expect("data availability checker"), ); let mut sl = SingleBlockLookup::::new( @@ -685,7 +685,7 @@ mod tests { .expect("store"); let da_checker = Arc::new( - DataAvailabilityChecker::new(slot_clock, None, store.into(), &log, spec.clone()) + DataAvailabilityChecker::new(slot_clock, None, store.into(), false, &log, spec.clone()) .expect("data availability checker"), ); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 818cdbd460f..19633cc0b0e 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -38,6 +38,14 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { /* * Network parameters. */ + .arg( + Arg::with_name("subscribe-all-data-column-subnets") + .long("subscribe-all-data-column-subnets") + .help("Subscribe to all data column subnets and participate in data custody for \ + all columns. This will also advertise the beacon node as being long-lived \ + subscribed to all data column subnets.") + .takes_value(false), + ) .arg( Arg::with_name("subscribe-all-subnets") .long("subscribe-all-subnets") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index fd2cf473cb3..5493386bd0e 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1129,6 +1129,10 @@ pub fn set_network_config( config.network_dir = data_dir.join(DEFAULT_NETWORK_DIR); }; + if cli_args.is_present("subscribe-all-data-column-subnets") { + config.subscribe_all_data_column_subnets = true; + } + if cli_args.is_present("subscribe-all-subnets") { config.subscribe_all_subnets = true; } diff --git a/book/src/help_bn.md b/book/src/help_bn.md index e437925a0e8..b628cae145a 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -108,6 +108,9 @@ FLAGS: server on localhost:5052 and import deposit logs from the execution node. This is equivalent to `--http` on merge-ready networks, or `--http --eth1` pre-merge + --subscribe-all-data-column-subnets Subscribe to all data column subnets and participate in data custody for + all columns. This will also advertise the beacon node as being long-lived + subscribed to all data column subnets. --subscribe-all-subnets Subscribe to all subnets regardless of validator count. This will also advertise the beacon node as being long-lived subscribed to all subnets. --validator-monitor-auto Enables the automatic detection and monitoring of validators connected to diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index c8695123ab0..8c3d71d6748 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -53,6 +53,8 @@ DENEB_FORK_EPOCH: 269568 # March 13, 2024, 01:55:35pm UTC # Electra ELECTRA_FORK_VERSION: 0x05000000 ELECTRA_FORK_EPOCH: 18446744073709551615 +# PeerDAS +PEER_DAS_EPOCH: 18446744073709551615 # Time parameters diff --git a/common/task_executor/Cargo.toml b/common/task_executor/Cargo.toml index b3d58fa5ea8..cc9a2c5097b 100644 --- a/common/task_executor/Cargo.toml +++ b/common/task_executor/Cargo.toml @@ -12,3 +12,4 @@ futures = { workspace = true } lazy_static = { workspace = true } lighthouse_metrics = { workspace = true } sloggers = { workspace = true } +logging = { workspace = true } diff --git a/common/task_executor/src/test_utils.rs b/common/task_executor/src/test_utils.rs index 6e372d97575..ec8f45d850e 100644 --- a/common/task_executor/src/test_utils.rs +++ b/common/task_executor/src/test_utils.rs @@ -1,4 +1,5 @@ use crate::TaskExecutor; +use logging::test_logger; use slog::Logger; use sloggers::{null::NullLoggerBuilder, Build}; use std::sync::Arc; @@ -26,7 +27,7 @@ impl Default for TestRuntime { fn default() -> Self { let (runtime_shutdown, exit) = async_channel::bounded(1); let (shutdown_tx, _) = futures::channel::mpsc::channel(1); - let log = null_logger().unwrap(); + let log = test_logger(); let (runtime, handle) = if let Ok(handle) = runtime::Handle::try_current() { (None, handle) diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index e77cd67ed15..a0abae8a7b3 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -60,6 +60,7 @@ eth2_network_config = { workspace = true } state_processing = { workspace = true } tokio = { workspace = true } paste = { workspace = true } +mockall_double = { workspace = true } [features] default = ["sqlite", "legacy-arith"] diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index a9e9e4ab138..e8a375a8591 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -194,6 +194,7 @@ pub struct ChainSpec { /* * DAS params */ + pub peer_das_epoch: Option, pub custody_requirement: u64, /* @@ -763,6 +764,7 @@ impl ChainSpec { /* * DAS params */ + peer_das_epoch: None, custody_requirement: 1, /* @@ -867,6 +869,8 @@ impl ChainSpec { // Electra electra_fork_version: [0x05, 0x00, 0x00, 0x01], electra_fork_epoch: None, + // PeerDAS + peer_das_epoch: None, // Other network_id: 2, // lighthouse testnet network id deposit_chain_id: 5, @@ -1071,6 +1075,7 @@ impl ChainSpec { /* * DAS params */ + peer_das_epoch: None, custody_requirement: 1, /* * Network specific @@ -1206,6 +1211,11 @@ pub struct Config { #[serde(deserialize_with = "deserialize_fork_epoch")] pub electra_fork_epoch: Option>, + #[serde(default)] + #[serde(serialize_with = "serialize_fork_epoch")] + #[serde(deserialize_with = "deserialize_fork_epoch")] + pub peer_das_epoch: Option>, + #[serde(with = "serde_utils::quoted_u64")] seconds_per_slot: u64, #[serde(with = "serde_utils::quoted_u64")] @@ -1596,6 +1606,10 @@ impl Config { .electra_fork_epoch .map(|epoch| MaybeQuoted { value: epoch }), + peer_das_epoch: spec + .peer_das_epoch + .map(|epoch| MaybeQuoted { value: epoch }), + seconds_per_slot: spec.seconds_per_slot, seconds_per_eth1_block: spec.seconds_per_eth1_block, min_validator_withdrawability_delay: spec.min_validator_withdrawability_delay, @@ -1673,6 +1687,7 @@ impl Config { deneb_fork_version, electra_fork_epoch, electra_fork_version, + peer_das_epoch, seconds_per_slot, seconds_per_eth1_block, min_validator_withdrawability_delay, @@ -1734,6 +1749,7 @@ impl Config { deneb_fork_version, electra_fork_epoch: electra_fork_epoch.map(|q| q.value), electra_fork_version, + peer_das_epoch: peer_das_epoch.map(|q| q.value), seconds_per_slot, seconds_per_eth1_block, min_validator_withdrawability_delay, diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 885dc243cfd..6b1925f3c4c 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -7,8 +7,12 @@ use crate::{ }; use bls::Signature; use derivative::Derivative; -use kzg::{Blob as KzgBlob, Error as KzgError, Kzg}; +#[cfg_attr(test, double)] +use kzg::Kzg; +use kzg::{Blob as KzgBlob, Error as KzgError}; use kzg::{KzgCommitment, KzgProof}; +#[cfg(test)] +use mockall_double::double; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; use ssz::Encode; @@ -76,6 +80,9 @@ impl DataColumnSidecar { block: &SignedBeaconBlock, kzg: &Kzg, ) -> Result, DataColumnSidecarError> { + if blobs.is_empty() { + return Ok(DataColumnSidecarList::empty()); + } let kzg_commitments = block .message() .body() @@ -239,6 +246,7 @@ pub type FixedDataColumnSidecarList = #[cfg(test)] mod test { + use super::*; use crate::beacon_block::EmptyBlock; use crate::beacon_block_body::KzgCommitments; use crate::eth_spec::EthSpec; @@ -247,10 +255,24 @@ mod test { DataColumnSidecar, MainnetEthSpec, SignedBeaconBlock, }; use bls::Signature; - use eth2_network_config::TRUSTED_SETUP_BYTES; - use kzg::{Kzg, KzgCommitment, KzgProof, TrustedSetup}; + use kzg::{KzgCommitment, KzgProof}; use std::sync::Arc; + #[test] + fn test_build_sidecars_empty() { + type E = MainnetEthSpec; + let num_of_blobs = 0; + let spec = E::default_spec(); + let (signed_block, blob_sidecars) = + create_test_block_and_blob_sidecars::(num_of_blobs, &spec); + + let mock_kzg = Arc::new(Kzg::default()); + let column_sidecars = + DataColumnSidecar::build_sidecars(&blob_sidecars, &signed_block, &mock_kzg).unwrap(); + + assert!(column_sidecars.is_empty()); + } + #[test] fn test_build_sidecars() { type E = MainnetEthSpec; @@ -259,11 +281,13 @@ mod test { let (signed_block, blob_sidecars) = create_test_block_and_blob_sidecars::(num_of_blobs, &spec); - let trusted_setup: TrustedSetup = serde_json::from_reader(TRUSTED_SETUP_BYTES).unwrap(); - let kzg = Arc::new(Kzg::new_from_trusted_setup(trusted_setup).unwrap()); + let mut mock_kzg = Kzg::default(); + mock_kzg + .expect_compute_cells_and_proofs() + .returning(kzg::mock::compute_cells_and_proofs); let column_sidecars = - DataColumnSidecar::build_sidecars(&blob_sidecars, &signed_block, &kzg).unwrap(); + DataColumnSidecar::build_sidecars(&blob_sidecars, &signed_block, &mock_kzg).unwrap(); let block_kzg_commitments = signed_block .message() diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs index b35abf2bbd6..94f06317360 100644 --- a/consensus/types/src/data_column_subnet_id.rs +++ b/consensus/types/src/data_column_subnet_id.rs @@ -1,4 +1,5 @@ //! Identifies each data column subnet by an integer identifier. +use crate::data_column_sidecar::ColumnIndex; use crate::EthSpec; use ethereum_types::U256; use safe_arith::{ArithError, SafeArith}; @@ -45,10 +46,10 @@ impl DataColumnSubnetId { } #[allow(clippy::arithmetic_side_effects)] - pub fn columns(&self) -> impl Iterator { + pub fn columns(&self) -> impl Iterator { let subnet = self.0; let data_column_subnet_count = E::data_column_subnet_count() as u64; - let columns_per_subnet = (E::number_of_columns() as u64) / data_column_subnet_count; + let columns_per_subnet = E::data_columns_per_subnet() as u64; (0..columns_per_subnet).map(move |i| data_column_subnet_count * i + subnet) } @@ -86,7 +87,7 @@ impl DataColumnSubnetId { pub fn compute_custody_columns( node_id: U256, custody_subnet_count: u64, - ) -> impl Iterator { + ) -> impl Iterator { Self::compute_custody_subnets::(node_id, custody_subnet_count) .flat_map(|subnet| subnet.columns::()) } @@ -150,8 +151,10 @@ impl From for Error { #[cfg(test)] mod test { use crate::data_column_subnet_id::DataColumnSubnetId; - use crate::ChainSpec; use crate::EthSpec; + use crate::{ChainSpec, MainnetEthSpec}; + + type E = MainnetEthSpec; #[test] fn test_compute_subnets_for_data_column() { @@ -174,20 +177,18 @@ mod test { let spec = ChainSpec::mainnet(); for node_id in node_ids { - let computed_subnets = DataColumnSubnetId::compute_custody_subnets::< - crate::MainnetEthSpec, - >(node_id, spec.custody_requirement); + let computed_subnets = + DataColumnSubnetId::compute_custody_subnets::(node_id, spec.custody_requirement); let computed_subnets: Vec<_> = computed_subnets.collect(); // the number of subnets is equal to the custody requirement assert_eq!(computed_subnets.len() as u64, spec.custody_requirement); - let subnet_count = crate::MainnetEthSpec::data_column_subnet_count(); - let columns_per_subnet = crate::MainnetEthSpec::number_of_columns() / subnet_count; + let subnet_count = E::data_column_subnet_count(); for subnet in computed_subnets { - let columns: Vec<_> = subnet.columns::().collect(); + let columns: Vec<_> = subnet.columns::().collect(); // the number of columns is equal to the specified number of columns per subnet - assert_eq!(columns.len(), columns_per_subnet); + assert_eq!(columns.len(), E::data_columns_per_subnet()); for pair in columns.windows(2) { // each successive column index is offset by the number of subnets diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 01a6c005f0e..1cb5c8d11b8 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -119,7 +119,7 @@ pub trait EthSpec: type MinCustodyRequirement: Unsigned + Clone + Sync + Send + Debug + PartialEq; type DataColumnSubnetCount: Unsigned + Clone + Sync + Send + Debug + PartialEq; type DataColumnCount: Unsigned + Clone + Sync + Send + Debug + PartialEq; - type MaxBytesPerColumn: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type DataColumnsPerSubnet: Unsigned + Clone + Sync + Send + Debug + PartialEq; type KzgCommitmentsInclusionProofDepth: Unsigned + Clone + Sync + Send + Debug + PartialEq; /* * Derived values (set these CAREFULLY) @@ -350,6 +350,10 @@ pub trait EthSpec: Self::DataColumnCount::to_usize() } + fn data_columns_per_subnet() -> usize { + Self::DataColumnsPerSubnet::to_usize() + } + fn min_custody_requirement() -> usize { Self::MinCustodyRequirement::to_usize() } @@ -358,10 +362,6 @@ pub trait EthSpec: Self::DataColumnSubnetCount::to_usize() } - fn max_bytes_per_column() -> usize { - Self::MaxBytesPerColumn::to_usize() - } - fn kzg_commitments_inclusion_proof_depth() -> usize { Self::KzgCommitmentsInclusionProofDepth::to_usize() } @@ -414,10 +414,7 @@ impl EthSpec for MainnetEthSpec { type MinCustodyRequirement = U1; type DataColumnSubnetCount = U32; type DataColumnCount = U128; - // Column samples are entire columns in 1D DAS. - // max data size = extended_blob_bytes * max_blobs_per_block / num_of_columns - // 256kb * 32 / 128 = 64kb - type MaxBytesPerColumn = U65536; + type DataColumnsPerSubnet = U4; type KzgCommitmentsInclusionProofDepth = U4; // inclusion of the whole list of commitments type SyncSubcommitteeSize = U128; // 512 committee size / 4 sync committee subnet count type MaxPendingAttestations = U4096; // 128 max attestations * 32 slots per epoch @@ -466,7 +463,7 @@ impl EthSpec for MinimalEthSpec { type MinCustodyRequirement = U1; type DataColumnSubnetCount = U32; type DataColumnCount = U128; - type MaxBytesPerColumn = U65536; + type DataColumnsPerSubnet = U4; type KzgCommitmentsInclusionProofDepth = U4; params_from_eth_spec!(MainnetEthSpec { @@ -563,7 +560,7 @@ impl EthSpec for GnosisEthSpec { type MinCustodyRequirement = U1; type DataColumnSubnetCount = U32; type DataColumnCount = U128; - type MaxBytesPerColumn = U65536; + type DataColumnsPerSubnet = U4; type KzgCommitmentsInclusionProofDepth = U4; fn default_spec() -> ChainSpec { @@ -574,3 +571,22 @@ impl EthSpec for GnosisEthSpec { EthSpecId::Gnosis } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_peer_das_config_all_specs() { + test_peer_das_config::(); + test_peer_das_config::(); + test_peer_das_config::(); + } + + fn test_peer_das_config() { + assert_eq!( + E::data_columns_per_subnet(), + E::number_of_columns() / E::data_column_subnet_count() + ); + } +} diff --git a/crypto/kzg/Cargo.toml b/crypto/kzg/Cargo.toml index d26dfe4992a..2c3c894b49c 100644 --- a/crypto/kzg/Cargo.toml +++ b/crypto/kzg/Cargo.toml @@ -17,3 +17,4 @@ ethereum_serde_utils = { workspace = true } hex = { workspace = true } ethereum_hashing = { workspace = true } c-kzg = { workspace = true } +mockall = { workspace = true } diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index 60eb0dfe0e7..bf63390d108 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -14,6 +14,8 @@ pub use c_kzg::{ BYTES_PER_FIELD_ELEMENT, BYTES_PER_PROOF, FIELD_ELEMENTS_PER_BLOB, }; use c_kzg::{Cell, CELLS_PER_BLOB}; +use mockall::automock; + #[derive(Debug)] pub enum Error { /// An error from the underlying kzg library. @@ -34,6 +36,7 @@ pub struct Kzg { trusted_setup: KzgSettings, } +#[automock] impl Kzg { /// Load the kzg trusted setup parameters from a vec of G1 and G2 points. pub fn new_from_trusted_setup(trusted_setup: TrustedSetup) -> Result { @@ -191,6 +194,24 @@ impl Kzg { } } +pub mod mock { + use crate::{Error, KzgProof}; + use c_kzg::{Blob, Cell, CELLS_PER_BLOB}; + + pub const MOCK_KZG_BYTES_PER_CELL: usize = 2048; + + #[allow(clippy::type_complexity)] + pub fn compute_cells_and_proofs( + _blob: &Blob, + ) -> Result<(Box<[Cell; CELLS_PER_BLOB]>, Box<[KzgProof; CELLS_PER_BLOB]>), Error> { + let empty_cell = Cell::new([0; MOCK_KZG_BYTES_PER_CELL]); + Ok(( + Box::new([empty_cell; CELLS_PER_BLOB]), + Box::new([KzgProof::empty(); CELLS_PER_BLOB]), + )) + } +} + impl TryFrom for Kzg { type Error = Error; diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index ec10ff4429d..bff39154240 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -778,6 +778,13 @@ fn network_target_peers_flag() { }); } #[test] +fn network_subscribe_all_data_column_subnets_flag() { + CommandLineTest::new() + .flag("subscribe-all-data-column-subnets", None) + .run_with_zero_port() + .with_config(|config| assert!(config.network.subscribe_all_data_column_subnets)); +} +#[test] fn network_subscribe_all_subnets_flag() { CommandLineTest::new() .flag("subscribe-all-subnets", None)