Skip to content

Commit

Permalink
Merge latest unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Aug 11, 2023
2 parents 8858f0e + 1fcada8 commit 8dfeb3c
Show file tree
Hide file tree
Showing 34 changed files with 1,927 additions and 984 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ pub struct ChainConfig {
///
/// This is useful for block builders and testing.
pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer.
Expand Down Expand Up @@ -114,7 +112,6 @@ impl Default for ChainConfig {
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false,
always_prepare_payload: false,
enable_backfill_rate_limiting: true,
progressive_balances_mode: ProgressiveBalancesMode::Checked,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
}
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ types = { path = "../../consensus/types" }
ethereum_ssz = "0.5.0"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.12.0"
parking_lot = "0.12.0"
num_cpus = "1.13.0"
serde = { version = "1.0.116", features = ["derive"] }
127 changes: 115 additions & 12 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use lighthouse_network::NetworkGlobals;
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::cmp;
Expand All @@ -70,7 +71,7 @@ pub mod work_reprocessing_queue;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for idle events to the `BeaconProcessor`.
///
Expand All @@ -79,7 +80,7 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const MAX_IDLE_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for re-processing work events.
pub const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4;
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;

/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
Expand Down Expand Up @@ -167,6 +168,14 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-0 (highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;

/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";

Expand All @@ -184,8 +193,8 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker";
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
/// individually verifying each attestation signature.
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;

/// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed";
Expand Down Expand Up @@ -215,6 +224,61 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";

#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
pub max_workers: usize,
pub max_work_event_queue_len: usize,
pub max_scheduled_work_queue_len: usize,
pub max_gossip_attestation_batch_size: usize,
pub max_gossip_aggregate_batch_size: usize,
pub enable_backfill_rate_limiting: bool,
}

impl Default for BeaconProcessorConfig {
fn default() -> Self {
Self {
max_workers: cmp::max(1, num_cpus::get()),
max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE,
enable_backfill_rate_limiting: true,
}
}
}

// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}

impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);

Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}

impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
fn default() -> Self {
Self::new(&BeaconProcessorConfig::default())
}
}

/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
Expand Down Expand Up @@ -363,7 +427,7 @@ impl<E: EthSpec> WorkEvent<E> {
}
}

impl<E: EthSpec> std::convert::From<ReadyWork> for WorkEvent<E> {
impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
fn from(ready_work: ReadyWork) -> Self {
match ready_work {
ReadyWork::Block(QueuedGossipBlock {
Expand Down Expand Up @@ -465,6 +529,10 @@ impl<E: EthSpec> BeaconProcessorSend<E> {
pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}

/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
Expand Down Expand Up @@ -523,6 +591,8 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
Expand Down Expand Up @@ -560,6 +630,8 @@ impl<E: EthSpec> Work<E> {
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
}
}
}
Expand Down Expand Up @@ -638,7 +710,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub executor: TaskExecutor,
pub max_workers: usize,
pub current_workers: usize,
pub enable_backfill_rate_limiting: bool,
pub config: BeaconProcessorConfig,
pub log: Logger,
}

Expand Down Expand Up @@ -714,11 +786,13 @@ impl<E: EthSpec> BeaconProcessor<E> {

let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);

let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);

// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(MAX_SCHEDULED_WORK_QUEUE_LEN);

mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
Expand All @@ -739,7 +813,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
reprocess_work_rx: ready_work_rx,
};

let enable_backfill_rate_limiting = self.enable_backfill_rate_limiting;
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;

loop {
let work_event = match inbound_events.next().await {
Expand Down Expand Up @@ -850,12 +924,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 0 API requests after blocks, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time.
} else if aggregate_queue.len() > 0 {
let batch_size =
cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE);
let batch_size = cmp::min(
aggregate_queue.len(),
self.config.max_gossip_aggregate_batch_size,
);

if batch_size < 2 {
// One single aggregate is in the queue, process it individually.
Expand Down Expand Up @@ -914,7 +993,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if attestation_queue.len() > 0 {
let batch_size = cmp::min(
attestation_queue.len(),
MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
self.config.max_gossip_attestation_batch_size,
);

if batch_size < 2 {
Expand Down Expand Up @@ -1005,6 +1084,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 1 API requests after we've
// processed all the interesting things from the network
// and things required for us to stay in good repute
// with our P2P peers.
} else if let Some(item) = api_request_p1_queue.pop() {
self.spawn_worker(item, idle_tx);
// Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, idle_tx);
Expand Down Expand Up @@ -1127,6 +1212,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP1 { .. } => {
api_request_p1_queue.push(work, work_id, &self.log)
}
}
}
}
Expand Down Expand Up @@ -1183,6 +1274,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
gossip_bls_to_execution_change_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL,
api_request_p0_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL,
api_request_p1_queue.len() as i64,
);

if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
Expand Down Expand Up @@ -1299,6 +1398,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking_with_manual_send_idle(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
},
Work::GossipVoluntaryExit(process_fn)
| Work::GossipProposerSlashing(process_fn)
| Work::GossipAttesterSlashing(process_fn)
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ lazy_static::lazy_static! {
"beacon_processor_sync_contribution_queue_total",
"Count of sync committee contributions waiting to be processed."
);
// HTTP API requests.
pub static ref BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p0_queue_total",
"Count of P0 HTTP requesets waiting to be processed."
);
pub static ref BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p1_queue_total",
"Count of P1 HTTP requesets waiting to be processed."
);

/*
* Attestation reprocessing queue metrics.
Expand Down
Loading

0 comments on commit 8dfeb3c

Please sign in to comment.