diff --git a/beacon_node/network/src/subnet_service/attestation_subnets.rs b/beacon_node/network/src/subnet_service/attestation_subnets.rs index ab9ffb95a6c..92c6bb6c3ea 100644 --- a/beacon_node/network/src/subnet_service/attestation_subnets.rs +++ b/beacon_node/network/src/subnet_service/attestation_subnets.rs @@ -29,6 +29,10 @@ pub(crate) const MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD: u64 = 2; /// Currently a whole slot ahead. const ADVANCE_SUBSCRIBE_SLOT_FRACTION: u32 = 1; +/// The number of slots after an aggregator duty where we remove the entry from +/// `aggregate_validators_on_subnet` delay map. +const UNSUBSCRIBE_AFTER_AGGREGATOR_DUTY: u32 = 2; + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub(crate) enum SubscriptionKind { /// Long lived subscriptions. @@ -462,23 +466,27 @@ impl AttestationService { ) -> Result<(), &'static str> { let slot_duration = self.beacon_chain.slot_clock.slot_duration(); + // The short time we schedule the subscription before it's actually required. This + // ensures we are subscribed on time, and allows consecutive subscriptions to the same + // subnet to overlap, reducing subnet churn. + let advance_subscription_duration = slot_duration / ADVANCE_SUBSCRIBE_SLOT_FRACTION; + // The time to the required slot. + let time_to_subscription_slot = self + .beacon_chain + .slot_clock + .duration_to_slot(slot) + .unwrap_or_default(); // If this is a past slot we will just get a 0 duration. + // Calculate how long before we need to subscribe to the subnet. - let time_to_subscription_start = { - // The short time we schedule the subscription before it's actually required. This - // ensures we are subscribed on time, and allows consecutive subscriptions to the same - // subnet to overlap, reducing subnet churn. - let advance_subscription_duration = slot_duration / ADVANCE_SUBSCRIBE_SLOT_FRACTION; - // The time to the required slot. - let time_to_subscription_slot = self - .beacon_chain - .slot_clock - .duration_to_slot(slot) - .unwrap_or_default(); // If this is a past slot we will just get a 0 duration. - time_to_subscription_slot.saturating_sub(advance_subscription_duration) - }; + let time_to_subscription_start = + time_to_subscription_slot.saturating_sub(advance_subscription_duration); + // The time after a duty slot where we no longer need it in the `aggregate_validators_on_subnet` + // delay map. + let time_to_unsubscribe = + time_to_subscription_slot + UNSUBSCRIBE_AFTER_AGGREGATOR_DUTY * slot_duration; if let Some(tracked_vals) = self.aggregate_validators_on_subnet.as_mut() { - tracked_vals.insert(ExactSubnet { subnet_id, slot }); + tracked_vals.insert_at(ExactSubnet { subnet_id, slot }, time_to_unsubscribe); } // If the subscription should be done in the future, schedule it. Otherwise subscribe