Skip to content

Commit

Permalink
Fix issues with thread count:
Browse files Browse the repository at this point in the history
- Update Rayon to the latest version
- Replaced Rayon with std::thread in state transitions to reduce dependency on the Rayon library
- Properly disable HiGHS parallelism
- Add DebugAttestationPacker feature for more granular AttestationPacker debugging
- Update AttestationPacker metrics in Grafana
- Add thread count metrics
  • Loading branch information
Tumas committed Jul 1, 2024
1 parent 3209317 commit ec17aae
Show file tree
Hide file tree
Showing 17 changed files with 574 additions and 263 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ num-bigint = '0.4.4'
num-integer = '0.1.45'
num-traits = '0.2.17'
num_cpus = '1.16.0'
num_threads = '0.1.7'
once_cell = '1.19.0'
openssl = '0.10.63'
parking_lot = '0.12.1'
Expand All @@ -362,7 +363,7 @@ quickcheck = '1.0.3'
quickcheck_macros = '1.0.0'
quote = '1.0.35'
rand = '0.8.5'
rayon = '1.8.1'
rayon = '1.10.0'
rc-box = '1.2.0'
refinery = { version = '0.8.12', features = ['rusqlite']}
regex = '1.10.3'
Expand Down
7 changes: 6 additions & 1 deletion features/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use core::{
};

use enum_iterator::Sequence;
use log::info;
use log::{info, warn};
use parse_display::{Display, FromStr};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -41,6 +41,7 @@ pub enum Feature {
AlwaysPrepackAttestations,
AlwaysPrepareExecutionPayload,
CacheTargetStates,
DebugAttestationPacker,
DebugEth1,
DebugP2p,
IgnoreAttestationsForUnknownBlocks,
Expand Down Expand Up @@ -98,6 +99,10 @@ impl Feature {
// Maybe `log::kv` will be stable someday. Or we could implement it ourselves.
info!("[{self}] {message}");
}

pub fn warn(self, message: impl Display) {
warn!("[{self}] {message}");
}
}

#[macro_export]
Expand Down
1 change: 1 addition & 0 deletions metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ helper_functions = { workspace = true }
http_api_utils = { workspace = true }
jemalloc-ctl = { workspace = true }
log = { workspace = true }
num_threads = { workspace = true }
p2p = { workspace = true }
prometheus = { workspace = true }
# `prometheus-client` is only needed for libp2p metrics.
Expand Down
6 changes: 6 additions & 0 deletions metrics/src/beaconchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub struct ProcessMetrics {
pub client_build: i64,
pub sync_eth2_fallback_configured: bool,
pub sync_eth2_fallback_connected: bool,
#[serde(skip)]
pub thread_count: usize,
}

impl ProcessMetrics {
Expand Down Expand Up @@ -123,6 +125,9 @@ impl ProcessMetrics {
// Grandine does not support Eth2 sync fallbacks.
sync_eth2_fallback_configured: false,
sync_eth2_fallback_connected: false,
thread_count: num_threads::num_threads()
.map(Into::into)
.unwrap_or_default(),
}
}
}
Expand Down Expand Up @@ -393,6 +398,7 @@ mod tests {
client_build: 12,
sync_eth2_fallback_configured: false,
sync_eth2_fallback_connected: false,
thread_count: 32,
};

[
Expand Down
1 change: 1 addition & 0 deletions metrics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ async fn scrape_system_stats(

let process_metrics = ProcessMetrics::get();

metrics.set_grandine_thread_count(process_metrics.thread_count);
metrics.set_total_cpu_seconds(process_metrics.cpu_process_seconds_total);

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,11 @@ impl<P: Preset> AttestationPacker<P> {
let mut selected_aggregates = Vec::new();

// Here parallelization is unnecessary, since integer programming is called for each different attestation data
problem = problem.set_parallel(HighsParallelType::Off);
problem = problem.set_parallel(HighsParallelType::Off).set_threads(1);

problem = problem
.set_time_limit(self.time_until_deadline_in_seconds()? * TIME_FRACTION_FOR_SUBPROBLEM);

let solution = problem.solve()?;

for (i, is_selected) in is_aggregate_selected.iter().enumerate() {
Expand Down
36 changes: 29 additions & 7 deletions operation_pools/src/attestation_agg_pool/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use core::time::Duration;
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use anyhow::Result;
use bls::PublicKeyBytes;
use eth1_api::ApiController;
use features::Feature::DebugAttestationPacker;
use fork_choice_control::Wait;
use helper_functions::accessors;
use prometheus_metrics::Metrics;
Expand Down Expand Up @@ -40,11 +41,18 @@ impl<P: Preset, W: Wait> PoolTask for BestProposableAttestationsTask<P, W> {
} = self;

let attestations = pool.best_proposable_attestations(beacon_state.slot()).await;
let slot = controller.slot();

if !attestations.is_empty() {
features::log!(
DebugAttestationPacker,
"optimal attestations present for slot: {slot}"
);
return Ok(attestations);
}

DebugAttestationPacker.warn("no optimal attestations for slot: {slot}");

let attestation_packer = AttestationPacker::new(
controller.chain_config().clone_arc(),
controller.head_block_root().value,
Expand Down Expand Up @@ -98,6 +106,7 @@ impl<P: Preset, W: Wait> PoolTask for PackProposableAttestationsTask<P, W> {
} = self;

let beacon_state = controller.preprocessed_state_at_next_slot()?;
let slot = controller.slot() + 1;

let mut attestation_packer = AttestationPacker::new(
controller.chain_config().clone_arc(),
Expand All @@ -107,19 +116,28 @@ impl<P: Preset, W: Wait> PoolTask for PackProposableAttestationsTask<P, W> {
)?;

let mut is_empty = true;
let mut iteration: u32 = 0;

loop {
let PackOutcome {
attestations,
deadline_reached,
} = {
let _timer = metrics.as_ref().map(|metrics| {
metrics
.att_pool_pack_proposable_attestation_task_times
.start_timer()
});
let timer = Instant::now();

let outcome =
pack_attestations_optimally(&attestation_packer, &pool, &beacon_state).await;

pack_attestations_optimally(&attestation_packer, &pool, &beacon_state).await
features::log!(
DebugAttestationPacker,
"pack outcome for slot: {slot}, attestations: {}, iteration: {iteration}, deadline_reached: {}, time: {:?}",
outcome.attestations.len(),
outcome.deadline_reached,
timer.elapsed()
);

iteration += 1;
outcome
};

if is_empty || !deadline_reached {
Expand All @@ -129,6 +147,10 @@ impl<P: Preset, W: Wait> PoolTask for PackProposableAttestationsTask<P, W> {
}

if deadline_reached {
if let Some(metrics) = metrics.as_ref() {
metrics.set_attestation_packer_iteration_count(iteration.saturating_sub(1));
}

break;
}

Expand Down
Loading

0 comments on commit ec17aae

Please sign in to comment.