Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple Subnet Management #6146

Open
wants to merge 17 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 133 additions & 145 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ clap = { version = "4.5.4", features = ["derive", "cargo", "wrap_help"] }
c-kzg = { version = "1", default-features = false }
compare_fields_derive = { path = "common/compare_fields_derive" }
criterion = "0.5"
delay_map = "0.3"
delay_map = "0.4"
derivative = "2"
dirs = "3"
either = "1.9"
Expand Down
92 changes: 22 additions & 70 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ use crate::nat;
use crate::network_beacon_processor::InvalidBlockStorage;
use crate::persisted_dht::{clear_dht, load_dht, persist_dht};
use crate::router::{Router, RouterMessage};
use crate::subnet_service::SyncCommitteeService;
use crate::subnet_service::{SubnetService, SubnetServiceMessage, Subscription};
use crate::NetworkConfig;
use crate::{error, metrics};
use crate::{
subnet_service::{AttestationService, SubnetServiceMessage},
NetworkConfig,
};
use beacon_chain::{BeaconChain, BeaconChainTypes};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
use futures::channel::mpsc::Sender;
Expand Down Expand Up @@ -162,10 +159,8 @@ pub struct NetworkService<T: BeaconChainTypes> {
beacon_chain: Arc<BeaconChain<T>>,
/// The underlying libp2p service that drives all the network interactions.
libp2p: Network<T::EthSpec>,
/// An attestation and subnet manager service.
attestation_service: AttestationService<T>,
/// A sync committeee subnet manager service.
sync_committee_service: SyncCommitteeService<T>,
/// An attestation and sync committee subnet manager service.
subnet_service: SubnetService<T>,
/// The receiver channel for lighthouse to communicate with the network service.
network_recv: mpsc::UnboundedReceiver<NetworkMessage<T::EthSpec>>,
/// The receiver channel for lighthouse to send validator subscription requests.
Expand Down Expand Up @@ -314,16 +309,13 @@ impl<T: BeaconChainTypes> NetworkService<T> {
network_log.clone(),
)?;

// attestation subnet service
let attestation_service = AttestationService::new(
// attestation and sync committee subnet service
let subnet_service = SubnetService::new(
beacon_chain.clone(),
network_globals.local_enr().node_id(),
config,
&network_log,
);
// sync committee subnet service
let sync_committee_service =
SyncCommitteeService::new(beacon_chain.clone(), config, &network_log);

// create a timer for updating network metrics
let metrics_update = tokio::time::interval(Duration::from_secs(METRIC_UPDATE_INTERVAL));
Expand All @@ -341,8 +333,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
let network_service = NetworkService {
beacon_chain,
libp2p,
attestation_service,
sync_committee_service,
subnet_service,
network_recv,
validator_subscription_recv,
router_send,
Expand Down Expand Up @@ -457,11 +448,8 @@ impl<T: BeaconChainTypes> NetworkService<T> {
// handle a message from a validator requesting a subscription to a subnet
Some(msg) = self.validator_subscription_recv.recv() => self.on_validator_subscription_msg(msg).await,

// process any attestation service events
Some(msg) = self.attestation_service.next() => self.on_attestation_service_msg(msg),

// process any sync committee service events
Some(msg) = self.sync_committee_service.next() => self.on_sync_committee_service_message(msg),
// process any subnet service events
Some(msg) = self.subnet_service.next() => self.on_subnet_service_msg(msg),

event = self.libp2p.next_event() => self.on_libp2p_event(event, &mut shutdown_sender).await,

Expand Down Expand Up @@ -549,13 +537,14 @@ impl<T: BeaconChainTypes> NetworkService<T> {
match message {
// attestation information gets processed in the attestation service
PubsubMessage::Attestation(ref subnet_and_attestation) => {
let subnet = subnet_and_attestation.0;
let subnet_id = subnet_and_attestation.0;
let attestation = &subnet_and_attestation.1;
// checks if we have an aggregator for the slot. If so, we should process
// the attestation, else we just just propagate the Attestation.
let should_process = self
.attestation_service
.should_process_attestation(subnet, attestation);
let should_process = self.subnet_service.should_process_attestation(
Subnet::Attestation(subnet_id),
attestation,
);
self.send_to_router(RouterMessage::PubsubMessage(
id,
source,
Expand Down Expand Up @@ -824,23 +813,17 @@ impl<T: BeaconChainTypes> NetworkService<T> {

/// Handle a message sent to the network service.
async fn on_validator_subscription_msg(&mut self, msg: ValidatorSubscriptionMessage) {
match msg {
if let Err(e) = match msg {
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
ValidatorSubscriptionMessage::AttestationSubscribe { subscriptions } => {
if let Err(e) = self
.attestation_service
.validator_subscriptions(subscriptions.into_iter())
{
warn!(self.log, "Attestation validator subscription failed"; "error" => e);
}
let subscriptions = subscriptions.into_iter().map(Subscription::Attestation);
self.subnet_service.validator_subscriptions(subscriptions)
}
ValidatorSubscriptionMessage::SyncCommitteeSubscribe { subscriptions } => {
if let Err(e) = self
.sync_committee_service
.validator_subscriptions(subscriptions)
{
warn!(self.log, "Sync committee calidator subscription failed"; "error" => e);
}
let subscriptions = subscriptions.into_iter().map(Subscription::SyncCommittee);
self.subnet_service.validator_subscriptions(subscriptions)
}
} {
warn!(self.log, "Validator subscription failed"; "error" => e);
}
}

Expand Down Expand Up @@ -875,35 +858,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

fn on_attestation_service_msg(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.subscribe(topic);
}
}
SubnetServiceMessage::Unsubscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
let topic =
GossipTopic::new(subnet.into(), GossipEncoding::default(), fork_digest);
self.libp2p.unsubscribe(topic);
}
}
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p.update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p.discover_subnet_peers(subnets_to_discover);
}
}
}

fn on_sync_committee_service_message(&mut self, msg: SubnetServiceMessage) {
fn on_subnet_service_msg(&mut self, msg: SubnetServiceMessage) {
match msg {
SubnetServiceMessage::Subscribe(subnet) => {
for fork_digest in self.required_gossip_fork_digests() {
Expand All @@ -922,9 +877,6 @@ impl<T: BeaconChainTypes> NetworkService<T> {
SubnetServiceMessage::EnrAdd(subnet) => {
self.libp2p.update_enr_subnet(subnet, true);
}
SubnetServiceMessage::EnrRemove(subnet) => {
self.libp2p.update_enr_subnet(subnet, false);
}
SubnetServiceMessage::DiscoverPeers(subnets_to_discover) => {
self.libp2p.discover_subnet_peers(subnets_to_discover);
}
Expand Down
11 changes: 4 additions & 7 deletions beacon_node/network/src/service/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(not(debug_assertions))]
// #[cfg(not(debug_assertions))]
#[cfg(test)]
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
mod tests {
use crate::persisted_dht::load_dht;
Expand Down Expand Up @@ -167,21 +167,18 @@ mod tests {
// Subscribe to the topics.
runtime.block_on(async {
while network_globals.gossipsub_subscriptions.read().len() < 2 {
if let Some(msg) = network_service.attestation_service.next().await {
network_service.on_attestation_service_msg(msg);
if let Some(msg) = network_service.subnet_service.next().await {
network_service.on_subnet_service_msg(msg);
}
}
});

// Make sure the service is subscribed to the topics.
let (old_topic1, old_topic2) = {
let mut subnets = SubnetId::compute_subnets_for_epoch::<MinimalEthSpec>(
let mut subnets = SubnetId::compute_attestation_subnets(
network_globals.local_enr().node_id().raw(),
beacon_chain.epoch().unwrap(),
&spec,
)
.unwrap()
.0
.collect::<Vec<_>>();
assert_eq!(2, subnets.len());

Expand Down
Loading
Loading