Skip to content

Commit

Permalink
Add supernode peer and reconstruction test
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 13, 2024
1 parent ddda406 commit bb2e27f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 20 deletions.
20 changes: 18 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::discovery::CombinedKey;
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, Gossipsub, PeerId};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use rand::seq::SliceRandom;
use score::{PeerAction, ReportSource, Score, ScoreState};
use slog::{crit, debug, error, trace, warn};
use ssz::Encode;
use std::net::IpAddr;
use std::time::Instant;
use std::{cmp::Ordering, fmt::Display};
Expand Down Expand Up @@ -673,9 +675,23 @@ impl<E: EthSpec> PeerDB<E> {
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(&mut self, peer_id: &PeerId) -> Option<BanOperation> {
pub fn __add_connected_peer_testing_only(
&mut self,
peer_id: &PeerId,
supernode: bool,
) -> Option<BanOperation> {
let enr_key = CombinedKey::generate_secp256k1();
let enr = Enr::builder().build(&enr_key).unwrap();
let mut enr = Enr::builder().build(&enr_key).unwrap();

if supernode {
enr.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&(E::data_column_subnet_count() as u64).as_ssz_bytes(),
&enr_key,
)
.expect("u64 can be encoded");
}

self.update_connection_state(
peer_id,
NewConnectionState::Connected {
Expand Down
18 changes: 18 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! A collection of variables that are accessible outside of the network thread itself.
use crate::discovery::enr::PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY;
use crate::peer_manager::peerdb::PeerDB;
use crate::rpc::{MetaData, MetaDataV2};
use crate::types::{BackFillState, SyncState};
use crate::EnrExt;
use crate::{Client, Eth2Enr};
use crate::{Enr, GossipTopic, Multiaddr, PeerId};
use parking_lot::RwLock;
use ssz::Encode;
use std::collections::HashSet;
use types::data_column_sidecar::ColumnIndex;
use types::{DataColumnSubnetId, Epoch, EthSpec};
Expand Down Expand Up @@ -141,6 +143,22 @@ impl<E: EthSpec> NetworkGlobals<E> {
log,
)
}

/// TESTING ONLY. Set a custody_subnet_count value
pub fn test_mutate_custody_subnet_count(&mut self, value: u64) {
use crate::CombinedKeyExt;
// For test: use a random key. WARNING: changes ENR NodeID
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key = discv5::enr::CombinedKey::from_secp256k1(&keypair);
self.local_enr
.write()
.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&value.as_ssz_bytes(),
&enr_key,
)
.expect("u64 can be serialized");
}
}

#[cfg(test)]
Expand Down
24 changes: 15 additions & 9 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,25 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
awaiting_parent: Option<Hash256>,
custody_column_indexes: Vec<ColumnIndex>,
) -> Self {
let column_count = custody_column_indexes.len();
let custody_columns_requests = custody_column_indexes
.into_iter()
.map(|column_index| {
let request = CustodyColumnRequestState::new(block_root, column_index);
(column_index, request)
})
.collect::<HashMap<_, _>>();
debug_assert_eq!(
column_count,
custody_columns_requests.len(),
"duplicate column indexes"
);

Self {
id,
block_request_state: BlockRequestState::new(block_root),
blob_request_state: BlobRequestState::new(block_root),
custody_columns_requests: custody_column_indexes
.into_iter()
.map(|column_index| {
(
column_index,
CustodyColumnRequestState::new(block_root, column_index),
)
})
.collect(),
custody_columns_requests,
block_root,
awaiting_parent,
peers: peers.iter().copied().collect(),
Expand Down
71 changes: 62 additions & 9 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>;

struct TestRigConfig {
peer_das_enabled: bool,
supernode: bool,
}

impl TestRig {
Expand All @@ -99,7 +100,7 @@ impl TestRig {
// Use `fork_from_env` logic to set correct fork epochs
let mut spec = test_spec::<E>();

if let Some(config) = config {
if let Some(config) = &config {
if config.peer_das_enabled {
spec.eip7594_fork_epoch = Some(Epoch::new(0));
}
Expand All @@ -123,9 +124,15 @@ impl TestRig {
let (network_tx, network_rx) = mpsc::unbounded_channel();
// TODO(das): make the generation of the ENR use the deterministic rng to have consistent
// column assignments
let globals = Arc::new(NetworkGlobals::new_test_globals(Vec::new(), &log));
let mut globals = NetworkGlobals::new_test_globals(Vec::new(), &log);
if let Some(config) = &config {
if config.supernode {
globals.test_mutate_custody_subnet_count(E::data_column_subnet_count() as u64);
}
}

let (beacon_processor, beacon_processor_rx) = NetworkBeaconProcessor::null_for_testing(
globals,
Arc::new(globals),
chain.clone(),
harness.runtime.task_executor.clone(),
log.clone(),
Expand Down Expand Up @@ -179,6 +186,19 @@ impl TestRig {
fn test_setup_after_peerdas() -> Option<Self> {
let r = Self::test_setup_with_config(Some(TestRigConfig {
peer_das_enabled: true,
supernode: false,
}));
if r.after_deneb() {
Some(r)
} else {
None
}
}

fn test_setup_after_peerdas_supernode() -> Option<Self> {
let r = Self::test_setup_with_config(Some(TestRigConfig {
peer_das_enabled: true,
supernode: true,
}));
if r.after_deneb() {
Some(r)
Expand Down Expand Up @@ -356,12 +376,26 @@ impl TestRig {
self.network_globals
.peers
.write()
.__add_connected_peer_testing_only(&peer_id);
.__add_connected_peer_testing_only(&peer_id, false);
peer_id
}

fn new_connected_supernode_peer(&mut self) -> PeerId {
let peer_id = PeerId::random();
self.network_globals
.peers
.write()
.__add_connected_peer_testing_only(&peer_id, true);
peer_id
}

fn new_connected_peers(&mut self, count: usize) -> Vec<PeerId> {
(0..count).map(|_| self.new_connected_peer()).collect()
fn new_connected_peers_for_peerdas(&mut self) {
// Enough sampling peers with few columns
for _ in 0..100 {
self.new_connected_peer();
}
// One supernode peer to ensure all columns have at least one peer
self.new_connected_supernode_peer();
}

fn parent_chain_processed_success(
Expand Down Expand Up @@ -1584,7 +1618,7 @@ fn sampling_happy_path() {
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
return;
};
r.new_connected_peers(100); // Add enough sampling peers
r.new_connected_peers_for_peerdas();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
Expand All @@ -1601,7 +1635,7 @@ fn sampling_with_retries() {
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
return;
};
r.new_connected_peers(100); // Add enough sampling peers
r.new_connected_peers_for_peerdas();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
r.trigger_sample_block(block_root, block.slot());
Expand All @@ -1621,7 +1655,7 @@ fn custody_lookup_happy_path() {
let Some(mut r) = TestRig::test_setup_after_peerdas() else {
return;
};
r.new_connected_peers(100); // Add enough sampling peers
r.new_connected_peers_for_peerdas();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
Expand Down Expand Up @@ -1654,6 +1688,25 @@ fn custody_lookup_no_peers_available_initially() {
r.expect_no_active_lookups();
}

#[test]
fn custody_lookup_reconstruction() {
let Some(mut r) = TestRig::test_setup_after_peerdas_supernode() else {
return;
};
r.new_connected_peers_for_peerdas();
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
r.trigger_unknown_block_from_attestation(block_root, peer_id);
// Should not request blobs
let id = r.expect_block_lookup_request(block.canonical_root());
r.complete_valid_block_request(id, block.into(), true);
let custody_column_count = E::data_column_subnet_count() * E::data_columns_per_subnet();
let custody_ids = r.expect_only_data_columns_by_root_requests(block_root, custody_column_count);
r.complete_valid_custody_request(custody_ids, data_columns, false);
r.expect_no_active_lookups();
}

// TODO(das): Test retries of DataColumnByRoot:
// - Expect request for column_index
// - Respond with bad data
Expand Down

0 comments on commit bb2e27f

Please sign in to comment.