diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index c3e77ae225e..76dbd483b57 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -1,9 +1,11 @@ +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::discovery::CombinedKey; use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId}; use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo}; use rand::seq::SliceRandom; use score::{PeerAction, ReportSource, Score, ScoreState}; use slog::{crit, debug, error, trace, warn}; +use ssz::Encode; use std::net::IpAddr; use std::time::Instant; use std::{cmp::Ordering, fmt::Display}; @@ -673,9 +675,23 @@ impl PeerDB { } /// Updates the connection state. MUST ONLY BE USED IN TESTS. - pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option { + pub fn __add_connected_peer_testing_only( + &mut self, + peer_id: &PeerId, + supernode: bool, + ) -> Option { let enr_key = CombinedKey::generate_secp256k1(); - let enr = Enr::builder().build(&enr_key).unwrap(); + let mut enr = Enr::builder().build(&enr_key).unwrap(); + + if supernode { + enr.insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &(E::data_column_subnet_count() as u64).as_ssz_bytes(), + &enr_key, + ) + .expect("u64 can be encoded"); + } + self.update_connection_state( peer_id, NewConnectionState::Connected { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 5dd74f25726..de4ef7785fd 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -1,4 +1,5 @@ //! A collection of variables that are accessible outside of the network thread itself. +use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY; use crate::peer_manager::peerdb::PeerDB; use crate::rpc::{MetaData, MetaDataV2}; use crate::types::{BackFillState, SyncState}; @@ -6,6 +7,7 @@ use crate::EnrExt; use crate::{Client, Eth2Enr}; use crate::{Enr, GossipTopic, Multiaddr, PeerId}; use parking_lot::RwLock; +use ssz::Encode; use std::collections::HashSet; use types::data_column_sidecar::ColumnIndex; use types::{DataColumnSubnetId, Epoch, EthSpec}; @@ -141,6 +143,22 @@ impl NetworkGlobals { log, ) } + + /// TESTING ONLY. Set a custody_subnet_count value + pub fn test_mutate_custody_subnet_count(&mut self, value: u64) { + use crate::CombinedKeyExt; + // For test: use a random key. WARNING: changes ENR NodeID + let keypair = libp2p::identity::secp256k1::Keypair::generate(); + let enr_key = discv5::enr::CombinedKey::from_secp256k1(&keypair); + self.local_enr + .write() + .insert( + PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, + &value.as_ssz_bytes(), + &enr_key, + ) + .expect("u64 can be serialized"); + } } #[cfg(test)] diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index ff1613f9e0b..526936f00fd 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -74,19 +74,25 @@ impl SingleBlockLookup { awaiting_parent: Option, custody_column_indexes: Vec, ) -> Self { + let column_count = custody_column_indexes.len(); + let custody_columns_requests = custody_column_indexes + .into_iter() + .map(|column_index| { + let request = CustodyColumnRequestState::new(block_root, column_index); + (column_index, request) + }) + .collect::>(); + debug_assert_eq!( + column_count, + custody_columns_requests.len(), + "duplicate column indexes" + ); + Self { id, block_request_state: BlockRequestState::new(block_root), blob_request_state: BlobRequestState::new(block_root), - custody_columns_requests: custody_column_indexes - .into_iter() - .map(|column_index| { - ( - column_index, - CustodyColumnRequestState::new(block_root, column_index), - ) - }) - .collect(), + custody_columns_requests, block_root, awaiting_parent, peers: peers.iter().copied().collect(), diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 7dfb46336ca..f6b1deb0672 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -89,6 +89,7 @@ type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>; struct TestRigConfig { peer_das_enabled: bool, + supernode: bool, } impl TestRig { @@ -99,7 +100,7 @@ impl TestRig { // Use `fork_from_env` logic to set correct fork epochs let mut spec = test_spec::(); - if let Some(config) = config { + if let Some(config) = &config { if config.peer_das_enabled { spec.eip7594_fork_epoch = Some(Epoch::new(0)); } @@ -123,9 +124,15 @@ impl TestRig { let (network_tx, network_rx) = mpsc::unbounded_channel(); // TODO(das): make the generation of the ENR use the deterministic rng to have consistent // column assignments - let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log)); + let mut globals = NetworkGlobals::new_test_globals(Vec::new(), &log); + if let Some(config) = &config { + if config.supernode { + globals.test_mutate_custody_subnet_count(E::data_column_subnet_count() as u64); + } + } + let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing( - globals, + Arc::new(globals), chain.clone(), harness.runtime.task_executor.clone(), log.clone(), @@ -179,6 +186,19 @@ impl TestRig { fn test_setup_after_peerdas() -> Option { let r = Self::test_setup_with_config(Some(TestRigConfig { peer_das_enabled: true, + supernode: false, + })); + if r.after_deneb() { + Some(r) + } else { + None + } + } + + fn test_setup_after_peerdas_supernode() -> Option { + let r = Self::test_setup_with_config(Some(TestRigConfig { + peer_das_enabled: true, + supernode: true, })); if r.after_deneb() { Some(r) @@ -356,12 +376,26 @@ impl TestRig { self.network_globals .peers .write() - .__add_connected_peer_testing_only(&peer_id); + .__add_connected_peer_testing_only(&peer_id, false); + peer_id + } + + fn new_connected_supernode_peer(&mut self) -> PeerId { + let peer_id = PeerId::random(); + self.network_globals + .peers + .write() + .__add_connected_peer_testing_only(&peer_id, true); peer_id } - fn new_connected_peers(&mut self, count: usize) -> Vec { - (0..count).map(|_| self.new_connected_peer()).collect() + fn new_connected_peers_for_peerdas(&mut self) { + // Enough sampling peers with few columns + for _ in 0..100 { + self.new_connected_peer(); + } + // One supernode peer to ensure all columns have at least one peer + self.new_connected_supernode_peer(); } fn parent_chain_processed_success( @@ -1584,7 +1618,7 @@ fn sampling_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); r.trigger_sample_block(block_root, block.slot()); @@ -1601,7 +1635,7 @@ fn sampling_with_retries() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); r.trigger_sample_block(block_root, block.slot()); @@ -1621,7 +1655,7 @@ fn custody_lookup_happy_path() { let Some(mut r) = TestRig::test_setup_after_peerdas() else { return; }; - r.new_connected_peers(100); // Add enough sampling peers + r.new_connected_peers_for_peerdas(); let (block, data_columns) = r.rand_block_and_data_columns(); let block_root = block.canonical_root(); let peer_id = r.new_connected_peer(); @@ -1654,6 +1688,25 @@ fn custody_lookup_no_peers_available_initially() { r.expect_no_active_lookups(); } +#[test] +fn custody_lookup_reconstruction() { + let Some(mut r) = TestRig::test_setup_after_peerdas_supernode() else { + return; + }; + r.new_connected_peers_for_peerdas(); + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); + // Should not request blobs + let id = r.expect_block_lookup_request(block.canonical_root()); + r.complete_valid_block_request(id, block.into(), true); + let custody_column_count = E::data_column_subnet_count() * E::data_columns_per_subnet(); + let custody_ids = r.expect_only_data_columns_by_root_requests(block_root, custody_column_count); + r.complete_valid_custody_request(custody_ids, data_columns, false); + r.expect_no_active_lookups(); +} + // TODO(das): Test retries of DataColumnByRoot: // - Expect request for column_index // - Respond with bad data