Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deterministic subnet peer discovery #4

Closed
4 changes: 4 additions & 0 deletions beacon_node/lighthouse_network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ pub struct Config {

/// Configuration for the inbound rate limiter (requests received by this node).
pub inbound_rate_limiter_config: Option<InboundRateLimiterConfig>,

/// Whether prefix search for attestation subnets is enabled.
pub prefix_search_for_subnet: bool,
}

impl Config {
Expand Down Expand Up @@ -375,6 +378,7 @@ impl Default for Config {
outbound_rate_limiter_config: None,
invalid_block_storage: None,
inbound_rate_limiter_config: None,
prefix_search_for_subnet: false,
}
}
}
Expand Down
140 changes: 112 additions & 28 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod enr_ext;

// Allow external use of the lighthouse ENR builder
use crate::service::TARGET_SUBNET_PEERS;
use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery};
use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet};
use crate::{metrics, ClearDialError};
use discv5::{enr::NodeId, Discv5, Discv5Event};
pub use enr::{
Expand Down Expand Up @@ -48,6 +48,7 @@ use tokio::sync::mpsc;
use types::{EnrForkId, EthSpec};

mod subnet_predicate;
use crate::types::{DiscoveryTarget, TargetedSubnetDiscovery};
pub use subnet_predicate::subnet_predicate;

/// Local ENR storage filename.
Expand Down Expand Up @@ -96,6 +97,7 @@ struct SubnetQuery {
subnet: Subnet,
min_ttl: Option<Instant>,
retries: usize,
target: DiscoveryTarget,
}

impl SubnetQuery {
Expand All @@ -120,6 +122,7 @@ impl std::fmt::Debug for SubnetQuery {
.field("subnet", &self.subnet)
.field("min_ttl_secs", &min_ttl_secs)
.field("retries", &self.retries)
.field("target", &self.target)
.finish()
}
}
Expand Down Expand Up @@ -359,11 +362,13 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
let target_peers = std::cmp::min(FIND_NODE_QUERY_CLOSEST_PEERS, target_peers);
debug!(self.log, "Starting a peer discovery request"; "target_peers" => target_peers );
self.find_peer_active = true;
self.start_query(QueryType::FindPeers, target_peers, |_| true);
self.start_query(QueryType::FindPeers, NodeId::random(), target_peers, |_| {
true
});
}

/// Processes a request to search for more peers on a subnet.
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<TargetedSubnetDiscovery>) {
// If the discv5 service isn't running, ignore queries
if !self.started {
return;
Expand All @@ -374,7 +379,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
"subnets" => ?subnets_to_discover.iter().map(|s| s.subnet).collect::<Vec<_>>()
);
for subnet in subnets_to_discover {
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0);
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0, subnet.target);
}
}

Expand Down Expand Up @@ -601,7 +606,13 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
/// updates the min_ttl field.
fn add_subnet_query(&mut self, subnet: Subnet, min_ttl: Option<Instant>, retries: usize) {
fn add_subnet_query(
&mut self,
subnet: Subnet,
min_ttl: Option<Instant>,
retries: usize,
target: DiscoveryTarget,
) {
// remove the entry and complete the query if greater than the maximum search count
if retries > MAX_DISCOVERY_RETRY {
debug!(
Expand Down Expand Up @@ -633,6 +644,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
subnet,
min_ttl,
retries,
target,
});
metrics::set_gauge(&metrics::DISCOVERY_QUEUE, self.queued_queries.len() as i64);
}
Expand Down Expand Up @@ -684,8 +696,6 @@ impl<TSpec: EthSpec> Discovery<TSpec> {

/// Runs a discovery request for a given group of subnets.
fn start_subnet_query(&mut self, subnet_queries: Vec<SubnetQuery>) {
let mut filtered_subnets: Vec<Subnet> = Vec::new();

// find subnet queries that are still necessary
let filtered_subnet_queries: Vec<SubnetQuery> = subnet_queries
.into_iter()
Expand Down Expand Up @@ -713,27 +723,81 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
"connected_peers_on_subnet" => peers_on_subnet,
"peers_to_find" => target_peers,
);

filtered_subnets.push(subnet_query.subnet);
true
})
.collect();

// Only start a discovery query if we have a subnet to look for.
if !filtered_subnet_queries.is_empty() {
// build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate = subnet_predicate::<TSpec>(filtered_subnets, &self.log);
let (random_search, prefix_search): (Vec<_>, Vec<_>) = filtered_subnet_queries
.into_iter()
.partition(|q| q.target == DiscoveryTarget::Random);

if !random_search.is_empty() {
// Build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate = subnet_predicate::<TSpec>(
random_search.iter().map(|q| q.subnet).collect::<Vec<_>>(),
&self.log,
);
debug!(
self.log,
"Starting grouped subnet query";
"subnets" => ?random_search,
);
self.start_query(
QueryType::Subnet(random_search),
NodeId::random(),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
}

debug!(
self.log,
"Starting grouped subnet query";
"subnets" => ?filtered_subnet_queries,
);
self.start_query(
QueryType::Subnet(filtered_subnet_queries),
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
if !prefix_search.is_empty() {
// Split the grouped subnet query into individual queries in order to prefix search.
for prefix_query in prefix_search {
// Target node
let target_node = match &prefix_query.target {
DiscoveryTarget::Random => unreachable!("target should be Prefix here"),
DiscoveryTarget::Prefix(node_ids) if node_ids.is_empty() => {
warn!(
self.log,
"No NodeIds given for prefix search, falling back to random search.";
"subnet" => ?prefix_query,
);
NodeId::random()
}
DiscoveryTarget::Prefix(node_ids) => {
let pos = prefix_query.retries % node_ids.len();
if let Some(id) = node_ids.get(pos) {
*id
} else {
warn!(
self.log,
"NodeIds were given for prefix search, but the offset was wrong. Choosing the first one instead.";
"subnet" => ?prefix_query,
);
*node_ids
.first()
.expect("Already checked that `node_ids` is not empty.")
}
}
};
// Build the subnet predicate as a combination of the eth2_fork_predicate and the subnet predicate
let subnet_predicate =
subnet_predicate::<TSpec>(vec![prefix_query.subnet], &self.log);
debug!(
self.log,
"Starting prefix search query";
"subnet" => ?prefix_query,
);
self.start_query(
QueryType::Subnet(vec![prefix_query]),
target_node,
TARGET_PEERS_FOR_GROUPED_QUERY,
subnet_predicate,
);
}
}
}
}

Expand All @@ -744,7 +808,8 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
/// ENR.
fn start_query(
&mut self,
query: QueryType,
query_type: QueryType,
target_node: NodeId,
target_peers: usize,
additional_predicate: impl Fn(&Enr) -> bool + Send + 'static,
) {
Expand All @@ -767,14 +832,13 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
// General predicate
let predicate: Box<dyn Fn(&Enr) -> bool + Send> =
Box::new(move |enr: &Enr| eth2_fork_predicate(enr) && additional_predicate(enr));

// Build the future
let query_future = self
.discv5
// Generate a random target node id.
.find_node_predicate(NodeId::random(), predicate, target_peers)
.find_node_predicate(target_node, predicate, target_peers)
.map(|v| QueryResult {
query_type: query,
query_type,
result: v,
});

Expand Down Expand Up @@ -818,7 +882,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
Ok(r) if r.is_empty() => {
debug!(self.log, "Grouped subnet discovery query yielded no results."; "subnets_searched_for" => ?subnets_searched_for);
queries.iter().for_each(|query| {
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
self.add_subnet_query(
query.subnet,
query.min_ttl,
query.retries + 1,
query.target.clone(),
);
})
}
Ok(r) => {
Expand All @@ -845,7 +914,12 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
v.inc();
}
// A subnet query has completed. Add back to the queue, incrementing retries.
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
self.add_subnet_query(
query.subnet,
query.min_ttl,
query.retries + 1,
query.target.clone(),
);

// Check the specific subnet against the enr
let subnet_predicate =
Expand Down Expand Up @@ -1205,17 +1279,24 @@ mod tests {
subnet: Subnet::Attestation(SubnetId::new(1)),
min_ttl: Some(now),
retries: 0,
target: DiscoveryTarget::Prefix(vec![NodeId::random(), NodeId::random()]),
};
discovery.add_subnet_query(
subnet_query.subnet,
subnet_query.min_ttl,
subnet_query.retries,
subnet_query.target.clone(),
);
assert_eq!(discovery.queued_queries.back(), Some(&subnet_query));

// New query should replace old query
subnet_query.min_ttl = Some(now + Duration::from_secs(1));
discovery.add_subnet_query(subnet_query.subnet, subnet_query.min_ttl, 1);
discovery.add_subnet_query(
subnet_query.subnet,
subnet_query.min_ttl,
1,
subnet_query.target.clone(),
);

subnet_query.retries += 1;

Expand All @@ -1231,6 +1312,7 @@ mod tests {
subnet_query.subnet,
subnet_query.min_ttl,
MAX_DISCOVERY_RETRY + 1,
subnet_query.target.clone(),
);

assert_eq!(discovery.queued_queries.len(), 0);
Expand Down Expand Up @@ -1263,11 +1345,13 @@ mod tests {
subnet: Subnet::Attestation(SubnetId::new(1)),
min_ttl: instant1,
retries: 0,
target: DiscoveryTarget::Prefix(vec![NodeId::random(), NodeId::random()]),
},
SubnetQuery {
subnet: Subnet::Attestation(SubnetId::new(2)),
min_ttl: instant2,
retries: 0,
target: DiscoveryTarget::Prefix(vec![NodeId::random(), NodeId::random()]),
},
]);

Expand Down
21 changes: 15 additions & 6 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ use crate::rpc::*;
use crate::service::behaviour::BehaviourEvent;
pub use crate::service::behaviour::Gossipsub;
use crate::types::{
fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic,
SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS,
CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
fork_core_topics, subnet_from_topic_hash, DiscoveryTarget, GossipEncoding, GossipKind,
GossipTopic, SnappyTransform, Subnet, TargetedSubnetDiscovery, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, DENEB_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
};
use crate::EnrExt;
use crate::Eth2Enr;
Expand Down Expand Up @@ -52,6 +52,7 @@ mod behaviour;
mod gossip_cache;
pub mod gossipsub_scoring_parameters;
pub mod utils;

/// The number of peers we target per subnet for discovery queries.
pub const TARGET_SUBNET_PEERS: usize = 6;

Expand Down Expand Up @@ -912,13 +913,13 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {

/// Attempts to discover new peers for a given subnet. The `min_ttl` gives the time at which we
/// would like to retain the peers for.
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<SubnetDiscovery>) {
pub fn discover_subnet_peers(&mut self, subnets_to_discover: Vec<TargetedSubnetDiscovery>) {
// If discovery is not started or disabled, ignore the request
if !self.discovery().started {
return;
}

let filtered: Vec<SubnetDiscovery> = subnets_to_discover
let filtered: Vec<TargetedSubnetDiscovery> = subnets_to_discover
.into_iter()
.filter(|s| {
// Extend min_ttl of connected peers on required subnets
Expand Down Expand Up @@ -1493,7 +1494,15 @@ impl<AppReqId: ReqId, TSpec: EthSpec> Network<AppReqId, TSpec> {
}
PeerManagerEvent::DiscoverSubnetPeers(subnets_to_discover) => {
// Peer manager has requested a subnet discovery query for more peers.
self.discover_subnet_peers(subnets_to_discover);
let subnet_discoveries = subnets_to_discover
.into_iter()
.map(|s| TargetedSubnetDiscovery {
subnet: s.subnet,
min_ttl: s.min_ttl,
target: DiscoveryTarget::Random,
})
.collect::<Vec<_>>();
self.discover_subnet_peers(subnet_discoveries);
None
}
PeerManagerEvent::Ping(peer_id) => {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;

pub use globals::NetworkGlobals;
pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use subnet::{DiscoveryTarget, Subnet, SubnetDiscovery, TargetedSubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
core_topics_to_subscribe, fork_core_topics, subnet_from_topic_hash, GossipEncoding, GossipKind,
Expand Down
13 changes: 13 additions & 0 deletions beacon_node/lighthouse_network/src/types/subnet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use discv5::enr::NodeId;
use serde::Serialize;
use std::time::Instant;
use types::{SubnetId, SyncSubnetId};
Expand Down Expand Up @@ -26,3 +27,15 @@ impl PartialEq for SubnetDiscovery {
self.subnet.eq(&other.subnet)
}
}

pub struct TargetedSubnetDiscovery {
pub subnet: Subnet,
pub min_ttl: Option<Instant>,
pub target: DiscoveryTarget,
}

#[derive(Debug, Clone, PartialEq)]
pub enum DiscoveryTarget {
Random,
Prefix(Vec<NodeId>),
}
1 change: 1 addition & 0 deletions beacon_node/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod metrics;
mod nat;
mod network_beacon_processor;
mod persisted_dht;
mod prefix_mapping;
mod router;
mod status;
mod subnet_service;
Expand Down
Loading
Loading