Skip to content

Commit

Permalink
Use BeaconProcessor for API requests (#4462)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Proposed Changes

Rather than spawning new tasks on the tokio executor to process each HTTP API request, send the tasks to the `BeaconProcessor`. This achieves:

1. Places a bound on how many concurrent requests are being served (i.e., how many we are actually trying to compute at one time).
1. Places a bound on how many requests can be awaiting a response at one time (i.e., starts dropping requests when we have too many queued).
1. Allows the BN prioritise HTTP requests with respect to messages coming from the P2P network (i.e., proiritise importing gossip blocks rather than serving API requests).

Presently there are two levels of priorities:

- `Priority::P0`
    - The beacon processor will prioritise these above everything other than importing new blocks.
    - Roughly all validator-sensitive endpoints.
- `Priority::P1`
    - The beacon processor will prioritise practically all other P2P messages over these, except for historical backfill things.
    - Everything that's not `Priority::P0`
    
The `--http-enable-beacon-processor false` flag can be supplied to revert back to the old behaviour of spawning new `tokio` tasks for each request:

```
        --http-enable-beacon-processor <BOOLEAN>
            The beacon processor is a scheduler which provides quality-of-service and DoS protection. When set to
            "true", HTTP API requests will queued and scheduled alongside other tasks. When set to "false", HTTP API
            responses will be executed immediately. [default: true]
```
    
## New CLI Flags

I added some other new CLI flags:

```
        --beacon-processor-aggregate-batch-size <INTEGER>
            Specifies the number of gossip aggregate attestations in a signature verification batch. Higher values may
            reduce CPU usage in a healthy network while lower values may increase CPU usage in an unhealthy or hostile
            network. [default: 64]
        --beacon-processor-attestation-batch-size <INTEGER>
            Specifies the number of gossip attestations in a signature verification batch. Higher values may reduce CPU
            usage in a healthy network whilst lower values may increase CPU usage in an unhealthy or hostile network.
            [default: 64]
        --beacon-processor-max-workers <INTEGER>
            Specifies the maximum concurrent tasks for the task scheduler. Increasing this value may increase resource
            consumption. Reducing the value may result in decreased resource usage and diminished performance. The
            default value is the number of logical CPU cores on the host.
        --beacon-processor-reprocess-queue-len <INTEGER>
            Specifies the length of the queue for messages requiring delayed processing. Higher values may prevent
            messages from being dropped while lower values may help protect the node from becoming overwhelmed.
            [default: 12288]
```


I needed to add the max-workers flag since the "simulator" flavor tests started failing with HTTP timeouts on the test assertions. I believe they were failing because the Github runners only have 2 cores and there just weren't enough workers available to process our requests in time. I added the other flags since they seem fun to fiddle with.

## Additional Info

I bumped the timeouts on the "simulator" flavor test from 4s to 8s. The prioritisation of consensus messages seems to be causing slower responses, I guess this is what we signed up for 🤷 

The `validator/register` validator has some special handling because the relays have a bad habit of timing out on these calls. It seems like a waste of a `BeaconProcessor` worker to just wait for the builder API HTTP response, so we spawn a new `tokio` task to wait for a builder response.

I've added an optimisation for the `GET beacon/states/{state_id}/validators/{validator_id}` endpoint in [efbabe3](efbabe3). That's the endpoint the VC uses to resolve pubkeys to validator indices, and it's the endpoint that was causing us grief. Perhaps I should move that into a new PR, not sure.
  • Loading branch information
paulhauner committed Aug 8, 2023
1 parent 1373dcf commit b60304b
Show file tree
Hide file tree
Showing 24 changed files with 1,870 additions and 968 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ pub struct ChainConfig {
///
/// This is useful for block builders and testing.
pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer.
Expand Down Expand Up @@ -114,7 +112,6 @@ impl Default for ChainConfig {
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false,
always_prepare_payload: false,
enable_backfill_rate_limiting: true,
progressive_balances_mode: ProgressiveBalancesMode::Checked,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
}
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ types = { path = "../../consensus/types" }
ethereum_ssz = "0.5.0"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.12.0"
parking_lot = "0.12.0"
num_cpus = "1.13.0"
serde = { version = "1.0.116", features = ["derive"] }
127 changes: 115 additions & 12 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use lighthouse_network::NetworkGlobals;
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::cmp;
Expand All @@ -70,7 +71,7 @@ pub mod work_reprocessing_queue;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for idle events to the `BeaconProcessor`.
///
Expand All @@ -79,7 +80,7 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const MAX_IDLE_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for re-processing work events.
pub const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4;
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;

/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
Expand Down Expand Up @@ -167,6 +168,14 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-0 (highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;

/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";

Expand All @@ -184,8 +193,8 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker";
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
/// individually verifying each attestation signature.
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;

/// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed";
Expand Down Expand Up @@ -215,6 +224,61 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";

#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
pub max_workers: usize,
pub max_work_event_queue_len: usize,
pub max_scheduled_work_queue_len: usize,
pub max_gossip_attestation_batch_size: usize,
pub max_gossip_aggregate_batch_size: usize,
pub enable_backfill_rate_limiting: bool,
}

impl Default for BeaconProcessorConfig {
fn default() -> Self {
Self {
max_workers: cmp::max(1, num_cpus::get()),
max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE,
enable_backfill_rate_limiting: true,
}
}
}

// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}

impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);

Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}

impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
fn default() -> Self {
Self::new(&BeaconProcessorConfig::default())
}
}

/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
Expand Down Expand Up @@ -363,7 +427,7 @@ impl<E: EthSpec> WorkEvent<E> {
}
}

impl<E: EthSpec> std::convert::From<ReadyWork> for WorkEvent<E> {
impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
fn from(ready_work: ReadyWork) -> Self {
match ready_work {
ReadyWork::Block(QueuedGossipBlock {
Expand Down Expand Up @@ -465,6 +529,10 @@ impl<E: EthSpec> BeaconProcessorSend<E> {
pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}

/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
Expand Down Expand Up @@ -523,6 +591,8 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
Expand Down Expand Up @@ -560,6 +630,8 @@ impl<E: EthSpec> Work<E> {
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
}
}
}
Expand Down Expand Up @@ -638,7 +710,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub executor: TaskExecutor,
pub max_workers: usize,
pub current_workers: usize,
pub enable_backfill_rate_limiting: bool,
pub config: BeaconProcessorConfig,
pub log: Logger,
}

Expand Down Expand Up @@ -714,11 +786,13 @@ impl<E: EthSpec> BeaconProcessor<E> {

let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);

let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);

// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(MAX_SCHEDULED_WORK_QUEUE_LEN);

mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
Expand All @@ -739,7 +813,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
reprocess_work_rx: ready_work_rx,
};

let enable_backfill_rate_limiting = self.enable_backfill_rate_limiting;
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;

loop {
let work_event = match inbound_events.next().await {
Expand Down Expand Up @@ -850,12 +924,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 0 API requests after blocks, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time.
} else if aggregate_queue.len() > 0 {
let batch_size =
cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE);
let batch_size = cmp::min(
aggregate_queue.len(),
self.config.max_gossip_aggregate_batch_size,
);

if batch_size < 2 {
// One single aggregate is in the queue, process it individually.
Expand Down Expand Up @@ -914,7 +993,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if attestation_queue.len() > 0 {
let batch_size = cmp::min(
attestation_queue.len(),
MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
self.config.max_gossip_attestation_batch_size,
);

if batch_size < 2 {
Expand Down Expand Up @@ -1005,6 +1084,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 1 API requests after we've
// processed all the interesting things from the network
// and things required for us to stay in good repute
// with our P2P peers.
} else if let Some(item) = api_request_p1_queue.pop() {
self.spawn_worker(item, idle_tx);
// Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, idle_tx);
Expand Down Expand Up @@ -1127,6 +1212,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP1 { .. } => {
api_request_p1_queue.push(work, work_id, &self.log)
}
}
}
}
Expand Down Expand Up @@ -1183,6 +1274,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
gossip_bls_to_execution_change_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL,
api_request_p0_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL,
api_request_p1_queue.len() as i64,
);

if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
Expand Down Expand Up @@ -1299,6 +1398,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking_with_manual_send_idle(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
},
Work::GossipVoluntaryExit(process_fn)
| Work::GossipProposerSlashing(process_fn)
| Work::GossipAttesterSlashing(process_fn)
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ lazy_static::lazy_static! {
"beacon_processor_sync_contribution_queue_total",
"Count of sync committee contributions waiting to be processed."
);
// HTTP API requests.
pub static ref BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p0_queue_total",
"Count of P0 HTTP requesets waiting to be processed."
);
pub static ref BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p1_queue_total",
"Count of P1 HTTP requesets waiting to be processed."
);

/*
* Attestation reprocessing queue metrics.
Expand Down
Loading

0 comments on commit b60304b

Please sign in to comment.