From 244a460e704184c0e0c356ce9dda20afd995a68a Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 10 Oct 2024 05:34:41 +0300 Subject: [PATCH] Bound min size of dynamic processor queues (#6466) * Bound min size of dynamic processor queues * Use max * Add test --- beacon_node/beacon_processor/src/lib.rs | 33 +++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index cd5a1d6cff0..02c287b68e3 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -93,6 +93,11 @@ const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_Q /// slightly, we don't need to adjust the queues during the lifetime of a process. const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110; +/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues +/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that +/// seems reasonable. +const MIN_QUEUE_LEN: usize = 128; + /// Maximum number of queued items that will be stored before dropping them pub struct BeaconProcessorQueueLengths { aggregate_queue: usize, @@ -155,9 +160,15 @@ impl BeaconProcessorQueueLengths { aggregate_queue: 4096, unknown_block_aggregate_queue: 1024, // Capacity for a full slot's worth of attestations if subscribed to all subnets - attestation_queue: active_validator_count / slots_per_epoch, + attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), // Capacity for a full slot's worth of attestations if subscribed to all subnets - unknown_block_attestation_queue: active_validator_count / slots_per_epoch, + unknown_block_attestation_queue: std::cmp::max( + active_validator_count / slots_per_epoch, + MIN_QUEUE_LEN, + ), sync_message_queue: 2048, sync_contribution_queue: 1024, gossip_voluntary_exit_queue: 4096, @@ -1686,3 +1697,21 @@ impl Drop for SendOnDrop { } } } + +#[cfg(test)] +mod tests { + use super::*; + use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec}; + + #[test] + fn min_queue_len() { + // State with no validators. + let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet()); + let genesis_time = 0; + let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); + assert_eq!(state.validators().len(), 0); + let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap(); + assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN); + assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN); + } +}