Skip to content

Commit

Permalink
Fix beacon-processor-max-workers (sigp#4636)
Browse files Browse the repository at this point in the history
## Issue Addressed

Fixes a bug in the handling of `--beacon-process-max-workers` which caused it to have no effect.

## Proposed Changes

For this PR I channeled @ethDreamer and saw deep into the faulty CLI config -- this bug is almost identical to the one Mark found and fixed in sigp#4622.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent a9644f4 commit fc0fbad
Show file tree
Hide file tree
Showing 7 changed files with 10 additions and 17 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,6 @@ pub struct BeaconProcessor<T: BeaconChainTypes> {
pub sync_tx: mpsc::UnboundedSender<SyncMessage<T::EthSpec>>,
pub network_globals: Arc<NetworkGlobals<T::EthSpec>>,
pub executor: TaskExecutor,
pub max_workers: usize,
pub current_workers: usize,
pub importing_blocks: DuplicateCache,
pub log: Logger,
Expand All @@ -923,7 +922,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
/// - Performed immediately, if a worker is available.
/// - Queued for later processing, if no worker is currently available.
///
/// Only `self.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
/// Only `self.config.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
/// started with `spawn_blocking`.
///
/// The optional `work_journal_tx` allows for an outside process to receive a log of all work
Expand Down Expand Up @@ -1044,7 +1043,7 @@ impl<T: BeaconChainTypes> BeaconProcessor<T> {
let _ = work_journal_tx.try_send(id);
}

let can_spawn = self.current_workers < self.max_workers;
let can_spawn = self.current_workers < self.config.max_workers;
let drop_during_sync = work_event
.as_ref()
.map_or(false, |event| event.drop_during_sync);
Expand Down
1 change: 0 additions & 1 deletion beacon_node/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,3 @@ slasher_service = { path = "../../slasher/service" }
monitoring_api = {path = "../../common/monitoring_api"}
execution_layer = { path = "../execution_layer" }
beacon_processor = { path = "../beacon_processor" }
num_cpus = "1.13.0"
2 changes: 0 additions & 2 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use network::{NetworkConfig, NetworkSenders, NetworkService};
use slasher::Slasher;
use slasher_service::SlasherService;
use slog::{debug, info, warn, Logger};
use std::cmp;
use std::net::TcpListener;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -765,7 +764,6 @@ where
BeaconProcessor {
network_globals: network_globals.clone(),
executor: beacon_processor_context.executor.clone(),
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
config: beacon_processor_config,
log: beacon_processor_context.log().clone(),
Expand Down
14 changes: 8 additions & 6 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,14 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
let eth1_service =
eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone()).unwrap();

let beacon_processor_config = BeaconProcessorConfig::default();
let beacon_processor_config = BeaconProcessorConfig {
// The number of workers must be greater than one. Tests which use the
// builder workflow sometimes require an internal HTTP request in order
// to fulfill an already in-flight HTTP request, therefore having only
// one worker will result in a deadlock.
max_workers: 2,
..BeaconProcessorConfig::default()
};
let BeaconProcessorChannels {
beacon_processor_tx,
beacon_processor_rx,
Expand All @@ -193,11 +200,6 @@ pub async fn create_api_server_on_port<T: BeaconChainTypes>(
BeaconProcessor {
network_globals: network_globals.clone(),
executor: test_runtime.task_executor.clone(),
// The number of workers must be greater than one. Tests which use the
// builder workflow sometimes require an internal HTTP request in order
// to fulfill an already in-flight HTTP request, therefore having only
// one worker will result in a deadlock.
max_workers: 2,
current_workers: 0,
config: beacon_processor_config,
log: log.clone(),
Expand Down
1 change: 0 additions & 1 deletion beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ logging = { path = "../../common/logging" }
task_executor = { path = "../../common/task_executor" }
igd = "0.12.1"
itertools = "0.10.0"
num_cpus = "1.13.0"
lru_cache = { path = "../../common/lru_cache" }
if-addrs = "0.6.4"
strum = "0.24.0"
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/network/src/network_beacon_processor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use lighthouse_network::{
MessageId, NetworkGlobals, PeerId,
};
use slot_clock::SlotClock;
use std::cmp;
use std::iter::Iterator;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -200,7 +199,6 @@ impl TestRig {
sync_tx,
network_globals,
executor,
max_workers: cmp::max(1, num_cpus::get()),
current_workers: 0,
importing_blocks: duplicate_cache.clone(),
invalid_block_storage: InvalidBlockStorage::Disabled,
Expand Down

0 comments on commit fc0fbad

Please sign in to comment.