Skip to content

Commit

Permalink
Improve get_custody_columns validation, caching and error handling (#…
Browse files Browse the repository at this point in the history
…6308)

* Improve `get_custody_columns` validation, caching and error handling.

* Merge branch 'unstable' into get-custody-columns-error-handing

* Fix failing test and add more test.

* Fix failing test and add more test.

* Merge branch 'unstable' into get-custody-columns-error-handing

# Conflicts:
#	beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs
#	beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
#	beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs
#	beacon_node/lighthouse_network/src/types/globals.rs
#	beacon_node/network/src/service.rs
#	consensus/types/src/data_column_subnet_id.rs

* Add unit test to make sure the default specs won't panic on the `compute_custody_requirement_subnets` function.

* Add condition when calling `compute_custody_subnets_from_metadata` and update logs.

* Validate `csc` when returning from enr. Remove `csc` computation on connection since we get them on metadata anyway.

* Add `peers_per_custody_subnet_count` to track peer csc and supernodes.

* Disconnect peers with invalid metadata and find other peers instead.

* Fix sampling tests.

* Merge branch 'unstable' into get-custody-columns-error-handing

* Merge branch 'unstable' into get-custody-columns-error-handing
  • Loading branch information
jimmygchen authored Sep 6, 2024
1 parent 0c5e25b commit c0b4f01
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 236 deletions.
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = network_globals.custody_columns();
let custody_columns_indices = &network_globals.custody_columns;

let custody_columns = gossip_verified_data_columns
.into_iter()
Expand Down
45 changes: 14 additions & 31 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub trait Eth2Enr {
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str>;

/// The peerdas custody subnet count associated with the ENR.
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> u64;
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;

fn eth2(&self) -> Result<EnrForkId, &'static str>;
}
Expand All @@ -64,14 +64,17 @@ impl Eth2Enr for Enr {
.map_err(|_| "Could not decode the ENR syncnets bitfield")
}

/// if the custody value is non-existent in the ENR, then we assume the minimum custody value
/// defined in the spec.
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> u64 {
self.get_decodable::<u64>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
.and_then(|r| r.ok())
// If value supplied in ENR is invalid, fallback to `custody_requirement`
.filter(|csc| csc <= &spec.data_column_sidecar_subnet_count)
.unwrap_or(spec.custody_requirement)
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
let csc = self
.get_decodable::<u64>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
.ok_or("ENR custody subnet count non-existent")?
.map_err(|_| "Could not decode the ENR custody subnet count")?;

if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count {
Ok(csc)
} else {
Err("Invalid custody subnet count in ENR")
}
}

fn eth2(&self) -> Result<EnrForkId, &'static str> {
Expand Down Expand Up @@ -335,7 +338,7 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec),
enr.custody_subnet_count::<E>(&spec).unwrap(),
spec.custody_requirement,
);
}
Expand All @@ -350,31 +353,11 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec),
enr.custody_subnet_count::<E>(&spec).unwrap(),
spec.data_column_sidecar_subnet_count,
);
}

#[test]
fn custody_subnet_count_fallback_default() {
let config = NetworkConfig::default();
let spec = make_eip7594_spec();
let (mut enr, enr_key) = build_enr_with_config(config, &spec);
let invalid_subnet_count = 999u64;

enr.insert(
PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY,
&invalid_subnet_count,
&enr_key,
)
.unwrap();

assert_eq!(
enr.custody_subnet_count::<E>(&spec),
spec.custody_requirement,
);
}

fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) {
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key = CombinedKey::from_secp256k1(&keypair);
Expand Down
19 changes: 10 additions & 9 deletions beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ where
E: EthSpec,
{
let log_clone = log.clone();
let spec_clone = spec.clone();

move |enr: &Enr| {
let attestation_bitfield: EnrAttestationBitfield<E> = match enr.attestation_bitfield::<E>()
Expand All @@ -30,8 +29,6 @@ where
let sync_committee_bitfield: Result<EnrSyncCommitteeBitfield<E>, _> =
enr.sync_committee_bitfield::<E>();

let custody_subnet_count = enr.custody_subnet_count::<E>(&spec_clone);

let predicate = subnets.iter().any(|subnet| match subnet {
Subnet::Attestation(s) => attestation_bitfield
.get(*s.deref() as usize)
Expand All @@ -40,12 +37,16 @@ where
.as_ref()
.map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)),
Subnet::DataColumn(s) => {
let mut subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,
&spec,
);
subnets.contains(s)
if let Ok(custody_subnet_count) = enr.custody_subnet_count::<E>(&spec) {
DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,
&spec,
)
.map_or(false, |mut subnets| subnets.contains(s))
} else {
false
}
}
});

Expand Down
9 changes: 9 additions & 0 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ pub static PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
&["Client"],
)
});

pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"peers_per_custody_subnet_count",
"The current count of peers by custody subnet count",
&["custody_subnet_count"],
)
});

pub static FAILED_ATTESTATION_PUBLISHES_PER_SUBNET: LazyLock<Result<IntGaugeVec>> =
LazyLock::new(|| {
try_create_int_gauge_vec(
Expand Down
140 changes: 96 additions & 44 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use types::{EthSpec, SyncSubnetId};
use types::{DataColumnSubnetId, EthSpec, SyncSubnetId};

pub use libp2p::core::Multiaddr;
pub use libp2p::identity::Keypair;
Expand All @@ -33,7 +33,7 @@ pub use peerdb::peer_info::{
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::net::IpAddr;
use strum::IntoEnumIterator;

Expand Down Expand Up @@ -701,6 +701,8 @@ impl<E: EthSpec> PeerManager<E> {

/// Received a metadata response from a peer.
pub fn meta_data_response(&mut self, peer_id: &PeerId, meta_data: MetaData<E>) {
let mut invalid_meta_data = false;

if let Some(peer_info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
if let Some(known_meta_data) = &peer_info.meta_data() {
if *known_meta_data.seq_number() < *meta_data.seq_number() {
Expand All @@ -717,12 +719,39 @@ impl<E: EthSpec> PeerManager<E> {
debug!(self.log, "Obtained peer's metadata";
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
}
let node_id_opt = peer_id_to_node_id(peer_id).ok();
peer_info.set_meta_data(meta_data, node_id_opt, &self.network_globals.spec);

let custody_subnet_count_opt = meta_data.custody_subnet_count().copied().ok();
peer_info.set_meta_data(meta_data);

if self.network_globals.spec.is_peer_das_scheduled() {
// Gracefully ignore metadata/v2 peers. Potentially downscore after PeerDAS to
// prioritize PeerDAS peers.
if let Some(custody_subnet_count) = custody_subnet_count_opt {
match self.compute_peer_custody_subnets(peer_id, custody_subnet_count) {
Ok(custody_subnets) => {
peer_info.set_custody_subnets(custody_subnets);
}
Err(err) => {
debug!(self.log, "Unable to compute peer custody subnets from metadata";
"info" => "Sending goodbye to peer",
"peer_id" => %peer_id,
"custody_subnet_count" => custody_subnet_count,
"error" => ?err,
);
invalid_meta_data = true;
}
};
}
}
} else {
error!(self.log, "Received METADATA from an unknown peer";
"peer_id" => %peer_id);
}

// Disconnect peers with invalid metadata and find other peers instead.
if invalid_meta_data {
self.goodbye_peer(peer_id, GoodbyeReason::Fault, ReportSource::PeerManager)
}
}

/// Updates the gossipsub scores for all known peers in gossipsub.
Expand Down Expand Up @@ -1290,6 +1319,7 @@ impl<E: EthSpec> PeerManager<E> {
let mut peers_connected = 0;
let mut clients_per_peer = HashMap::new();
let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new();
let mut peers_per_custody_subnet_count: HashMap<u64, i64> = HashMap::new();

for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
peers_connected += 1;
Expand Down Expand Up @@ -1320,11 +1350,26 @@ impl<E: EthSpec> PeerManager<E> {
*peers_connected_mutli
.entry((direction, transport))
.or_default() += 1;

if let Some(MetaData::V3(meta_data)) = peer_info.meta_data() {
*peers_per_custody_subnet_count
.entry(meta_data.custody_subnet_count)
.or_default() += 1;
}
}

// PEERS_CONNECTED
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);

// CUSTODY_SUBNET_COUNT
for (custody_subnet_count, peer_count) in peers_per_custody_subnet_count.into_iter() {
metrics::set_gauge_vec(
&metrics::PEERS_PER_CUSTODY_SUBNET_COUNT,
&[&custody_subnet_count.to_string()],
peer_count,
)
}

// PEERS_PER_CLIENT
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
Expand All @@ -1348,6 +1393,45 @@ impl<E: EthSpec> PeerManager<E> {
}
}
}

fn compute_peer_custody_subnets(
&self,
peer_id: &PeerId,
custody_subnet_count: u64,
) -> Result<HashSet<DataColumnSubnetId>, String> {
// If we don't have a node id, we cannot compute the custody duties anyway
let node_id = peer_id_to_node_id(peer_id)?;
let spec = &self.network_globals.spec;

if !(spec.custody_requirement..=spec.data_column_sidecar_subnet_count)
.contains(&custody_subnet_count)
{
return Err("Invalid custody subnet count in metadata: out of range".to_string());
}

let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id.raw(),
custody_subnet_count,
spec,
)
.map(|subnets| subnets.collect())
.unwrap_or_else(|e| {
// This is an unreachable scenario unless there's a bug, as we've validated the csc
// just above.
error!(
self.log,
"Computing peer custody subnets failed unexpectedly";
"info" => "Falling back to default custody requirement subnets",
"peer_id" => %peer_id,
"custody_subnet_count" => custody_subnet_count,
"error" => ?e
);
DataColumnSubnetId::compute_custody_requirement_subnets::<E>(node_id.raw(), spec)
.collect()
});

Ok(custody_subnets)
}
}

enum ConnectingType {
Expand Down Expand Up @@ -1680,11 +1764,7 @@ mod tests {
.write()
.peer_info_mut(&peer0)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
Expand All @@ -1704,11 +1784,7 @@ mod tests {
.write()
.peer_info_mut(&peer2)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
Expand All @@ -1728,11 +1804,7 @@ mod tests {
.write()
.peer_info_mut(&peer4)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -1806,11 +1878,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -1934,11 +2002,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
let long_lived_subnets = peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -2047,11 +2111,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
let long_lived_subnets = peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -2217,11 +2277,7 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
.set_meta_data(MetaData::V2(metadata));
let long_lived_subnets = peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -2378,11 +2434,7 @@ mod tests {

let mut peer_db = peer_manager.network_globals.peers.write();
let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap();
peer_info.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_info.set_meta_data(MetaData::V2(metadata));
peer_info.set_gossipsub_score(condition.gossipsub_score);
peer_info.add_to_score(condition.score);

Expand Down
Loading

0 comments on commit c0b4f01

Please sign in to comment.