Skip to content

Commit

Permalink
Implement Metadatav3 (#6303)
Browse files Browse the repository at this point in the history
* Add a V3 variant for metadata

* Add v3 for requests; persistence logic

* Set custody_subnets on setting metadata

* Fix tests

* Address some comments

* fmt

* Address more comments

* Fix tests

* Update metadata rpc limits

* Update method doc.
  • Loading branch information
pawanjay176 authored Aug 28, 2024
1 parent f75a2cf commit bcff4aa
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 60 deletions.
52 changes: 43 additions & 9 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Implementation of Lighthouse's peer management system.

use crate::discovery::enr_ext::EnrExt;
use crate::discovery::peer_id_to_node_id;
use crate::rpc::{GoodbyeReason, MetaData, Protocol, RPCError, RPCResponseErrorCode};
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, metrics, Gossipsub};
Expand Down Expand Up @@ -716,7 +717,8 @@ impl<E: EthSpec> PeerManager<E> {
debug!(self.log, "Obtained peer's metadata";
"peer_id" => %peer_id, "new_seq_no" => meta_data.seq_number());
}
peer_info.set_meta_data(meta_data);
let node_id_opt = peer_id_to_node_id(peer_id).ok();
peer_info.set_meta_data(meta_data, node_id_opt, &self.network_globals.spec);
} else {
error!(self.log, "Received METADATA from an unknown peer";
"peer_id" => %peer_id);
Expand Down Expand Up @@ -1678,7 +1680,11 @@ mod tests {
.write()
.peer_info_mut(&peer0)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_manager
.network_globals
.peers
Expand All @@ -1698,7 +1704,11 @@ mod tests {
.write()
.peer_info_mut(&peer2)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_manager
.network_globals
.peers
Expand All @@ -1718,7 +1728,11 @@ mod tests {
.write()
.peer_info_mut(&peer4)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -1792,7 +1806,11 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -1916,7 +1934,11 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
let long_lived_subnets = peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -2025,7 +2047,11 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
let long_lived_subnets = peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -2191,7 +2217,11 @@ mod tests {
.write()
.peer_info_mut(&peer)
.unwrap()
.set_meta_data(MetaData::V2(metadata));
.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
let long_lived_subnets = peer_manager
.network_globals
.peers
Expand Down Expand Up @@ -2348,7 +2378,11 @@ mod tests {

let mut peer_db = peer_manager.network_globals.peers.write();
let peer_info = peer_db.peer_info_mut(&condition.peer_id).unwrap();
peer_info.set_meta_data(MetaData::V2(metadata));
peer_info.set_meta_data(
MetaData::V2(metadata),
None,
&peer_manager.network_globals.spec,
);
peer_info.set_gossipsub_score(condition.gossipsub_score);
peer_info.add_to_score(condition.score);

Expand Down
13 changes: 4 additions & 9 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,17 @@ impl<E: EthSpec> PeerDB<E> {
.map(|(peer_id, _)| peer_id)
}

/// Returns an iterator of all good gossipsub peers that are supposed to be custodying
/// the given subnet id.
pub fn good_custody_subnet_peer(
&self,
subnet: DataColumnSubnetId,
) -> impl Iterator<Item = &PeerId> {
self.peers
.iter()
.filter(move |(_, info)| {
// TODO(das): we currently consider peer to be a subnet peer if the peer is *either*
// subscribed to the subnet or assigned to the subnet.
// The first condition is currently required as we don't have custody count in
// metadata implemented yet, and therefore unable to reliably determine custody
// subnet count (ENR is not always available).
// This condition can be removed later so that we can identify peers that are not
// serving custody columns and penalise accordingly.
let is_custody_subnet_peer = info.on_subnet_gossipsub(&Subnet::DataColumn(subnet))
|| info.is_assigned_to_custody_subnet(&subnet);
// The custody_subnets hashset can be populated via enr or metadata
let is_custody_subnet_peer = info.is_assigned_to_custody_subnet(&subnet);
info.is_connected() && info.is_good_gossipsub_peer() && is_custody_subnet_peer
})
.map(|(peer_id, _)| peer_id)
Expand Down
42 changes: 30 additions & 12 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use super::score::{PeerAction, Score, ScoreState};
use super::sync_status::SyncStatus;
use crate::discovery::Eth2Enr;
use crate::{rpc::MetaData, types::Subnet};
use discv5::enr::NodeId;
use discv5::Enr;
use libp2p::core::multiaddr::{Multiaddr, Protocol};
use serde::{
Expand All @@ -13,7 +14,7 @@ use std::collections::HashSet;
use std::net::IpAddr;
use std::time::Instant;
use strum::AsRefStr;
use types::{DataColumnSubnetId, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
use PeerConnectionStatus::*;

/// Information about a given connected peer.
Expand Down Expand Up @@ -89,6 +90,7 @@ impl<E: EthSpec> PeerInfo<E> {
}

/// Returns if the peer is subscribed to a given `Subnet` from the metadata attnets/syncnets field.
/// Also returns true if the peer is assigned to custody a given data column `Subnet` computed from the metadata `custody_column_count` field or ENR `csc` field.
pub fn on_subnet_metadata(&self, subnet: &Subnet) -> bool {
if let Some(meta_data) = &self.meta_data {
match subnet {
Expand All @@ -100,15 +102,7 @@ impl<E: EthSpec> PeerInfo<E> {
.syncnets()
.map_or(false, |s| s.get(**id as usize).unwrap_or(false))
}
Subnet::DataColumn(_) => {
// TODO(das): Pending spec PR https://github.com/ethereum/consensus-specs/pull/3821
// We should use MetaDataV3 for peer selection rather than
// looking at subscribed peers (current behavior). Until MetaDataV3 is
// implemented, this is the perhaps the only viable option on the current devnet
// as the peer count is low and it's important to identify supernodes to get a
// good distribution of peers across subnets.
return true;
}
Subnet::DataColumn(column) => return self.custody_subnets.contains(column),
}
}
false
Expand Down Expand Up @@ -364,8 +358,32 @@ impl<E: EthSpec> PeerInfo<E> {

/// Sets an explicit value for the meta data.
// VISIBILITY: The peer manager is able to adjust the meta_data
pub(in crate::peer_manager) fn set_meta_data(&mut self, meta_data: MetaData<E>) {
self.meta_data = Some(meta_data)
pub(in crate::peer_manager) fn set_meta_data(
&mut self,
meta_data: MetaData<E>,
node_id_opt: Option<NodeId>,
spec: &ChainSpec,
) {
// If we don't have a node id, we cannot compute the custody duties anyway
let Some(node_id) = node_id_opt else {
self.meta_data = Some(meta_data);
return;
};

// Already set by enr if custody_subnets is non empty
if self.custody_subnets.is_empty() {
if let Ok(custody_subnet_count) = meta_data.custody_subnet_count() {
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id.raw().into(),
std::cmp::min(*custody_subnet_count, spec.data_column_sidecar_subnet_count),
spec,
)
.collect::<HashSet<_>>();
self.set_custody_subnets(custody_subnets);
}
}

self.meta_data = Some(meta_data);
}

/// Sets the connection status of the peer.
Expand Down
42 changes: 39 additions & 3 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ impl<E: EthSpec> Encoder<RPCCodedResponse<E>> for SSZSnappyInboundCodec<E> {
{
match self.protocol.versioned_protocol {
SupportedProtocol::MetaDataV1 => res.metadata_v1().as_ssz_bytes(),
// We always send V2 metadata responses from the behaviour
// No change required.
SupportedProtocol::MetaDataV2 => res.metadata_v2().as_ssz_bytes(),
SupportedProtocol::MetaDataV3 => {
res.metadata_v3(&self.fork_context.spec).as_ssz_bytes()
}
_ => unreachable!(
"We only send metadata responses on negotiating metadata requests"
),
Expand Down Expand Up @@ -136,6 +137,9 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV2 {
return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v2())));
}
if self.protocol.versioned_protocol == SupportedProtocol::MetaDataV3 {
return Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v3())));
}
let Some(length) = handle_length(&mut self.inner, &mut self.len, src)? else {
return Ok(None);
};
Expand Down Expand Up @@ -549,6 +553,15 @@ fn handle_rpc_request<E: EthSpec>(
}
// MetaData requests return early from InboundUpgrade and do not reach the decoder.
// Handle this case just for completeness.
SupportedProtocol::MetaDataV3 => {
if !decoded_buffer.is_empty() {
Err(RPCError::InternalError(
"Metadata requests shouldn't reach decoder",
))
} else {
Ok(Some(InboundRequest::MetaData(MetadataRequest::new_v3())))
}
}
SupportedProtocol::MetaDataV2 => {
if !decoded_buffer.is_empty() {
Err(RPCError::InternalError(
Expand Down Expand Up @@ -712,7 +725,10 @@ fn handle_rpc_response<E: EthSpec>(
),
)),
},
// MetaData V2 responses have no context bytes, so behave similarly to V1 responses
// MetaData V2/V3 responses have no context bytes, so behave similarly to V1 responses
SupportedProtocol::MetaDataV3 => Ok(Some(RPCResponse::MetaData(MetaData::V3(
MetaDataV3::from_ssz_bytes(decoded_buffer)?,
)))),
SupportedProtocol::MetaDataV2 => Ok(Some(RPCResponse::MetaData(MetaData::V2(
MetaDataV2::from_ssz_bytes(decoded_buffer)?,
)))),
Expand Down Expand Up @@ -984,6 +1000,15 @@ mod tests {
})
}

fn metadata_v3() -> MetaData<Spec> {
MetaData::V3(MetaDataV3 {
seq_number: 1,
attnets: EnrAttestationBitfield::<Spec>::default(),
syncnets: EnrSyncCommitteeBitfield::<Spec>::default(),
custody_subnet_count: 1,
})
}

/// Encodes the given protocol response as bytes.
fn encode_response(
protocol: SupportedProtocol,
Expand Down Expand Up @@ -1217,6 +1242,17 @@ mod tests {
Ok(Some(RPCResponse::MetaData(metadata()))),
);

// A MetaDataV3 still encodes as a MetaDataV2 since version is Version::V2
assert_eq!(
encode_then_decode_response(
SupportedProtocol::MetaDataV2,
RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v3())),
ForkName::Base,
&chain_spec,
),
Ok(Some(RPCResponse::MetaData(metadata_v2()))),
);

assert_eq!(
encode_then_decode_response(
SupportedProtocol::BlobsByRangeV1,
Expand Down
Loading

0 comments on commit bcff4aa

Please sign in to comment.