Skip to content

Commit

Permalink
Add attestation simulator (#4880)
Browse files Browse the repository at this point in the history
* basic scaffold

* remove unnecessary ?

* check if committee cache is init

* typed ValidatorMonitor with ethspecs + store attestations within

* nits

* process unaggregated attestation

* typo

* extract in func

* add tests

* better naming

* better naming 2

* less verbose

* use same naming as validator monitor

* use attestation_simulator

* add metrics

* remove cache

* refacto flag_indices process

* add lag

* remove copying state

* clean and lint

* extract metrics

* nits

* compare prom metrics in tests

* implement lag

* nits

* nits

* add attestation simulator service

* fmt

* return beacon_chain as arc

* nit: debug

* sed s/unaggregated/unagg.//

* fmt

* fmt

* nit: remove unused comments

* increase max unaggregated attestation hashmap to 64

* nit: sed s/clone/copied//

* improve perf: remove unecessary hashmap copy

* fix flag indices comp

* start service in client builder

* remove //

* cargo fmt

* lint

* cloned keys

* fmt

* use Slot value instead of pointer

* Update beacon_node/beacon_chain/src/attestation_simulator.rs

Co-authored-by: Paul Hauner <paul@paulhauner.com>

---------

Co-authored-by: Paul Hauner <paul@paulhauner.com>
  • Loading branch information
v4lproik and paulhauner authored Dec 14, 2023
1 parent a3a3703 commit 189430a
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 11 deletions.
93 changes: 93 additions & 0 deletions beacon_node/beacon_chain/src/attestation_simulator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use crate::{BeaconChain, BeaconChainTypes};
use slog::{debug, error};
use slot_clock::SlotClock;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::sleep;
use types::Slot;

/// Spawns a routine which produces an unaggregated attestation at every slot.
///
/// This routine will run once per slot
pub fn start_attestation_simulator_service<T: BeaconChainTypes>(
executor: TaskExecutor,
chain: Arc<BeaconChain<T>>,
) {
executor.clone().spawn(
async move { attestation_simulator_service(executor, chain).await },
"attestation_simulator_service",
);
}

/// Loop indefinitely, calling `BeaconChain::produce_unaggregated_attestation` every 4s into each slot.
async fn attestation_simulator_service<T: BeaconChainTypes>(
executor: TaskExecutor,
chain: Arc<BeaconChain<T>>,
) {
let slot_duration = chain.slot_clock.slot_duration();
let additional_delay = slot_duration / 3;

loop {
match chain.slot_clock.duration_to_next_slot() {
Some(duration) => {
sleep(duration + additional_delay).await;

debug!(
chain.log,
"Simulating unagg. attestation production";
);

// Run the task in the executor
let inner_chain = chain.clone();
executor.spawn(
async move {
if let Ok(current_slot) = inner_chain.slot() {
produce_unaggregated_attestation(inner_chain, current_slot);
}
},
"attestation_simulator_service",
);
}
None => {
error!(chain.log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another slot.
sleep(slot_duration).await;
}
};
}
}

pub fn produce_unaggregated_attestation<T: BeaconChainTypes>(
inner_chain: Arc<BeaconChain<T>>,
current_slot: Slot,
) {
// Since attestations for different committees are practically identical (apart from the committee index field)
// Committee 0 is guaranteed to exist. That means there's no need to load the committee.
let beacon_committee_index = 0;

// Store the unaggregated attestation in the validator monitor for later processing
match inner_chain.produce_unaggregated_attestation(current_slot, beacon_committee_index) {
Ok(unaggregated_attestation) => {
let data = &unaggregated_attestation.data;

debug!(
inner_chain.log,
"Produce unagg. attestation";
"attestation_source" => data.source.root.to_string(),
"attestation_target" => data.target.root.to_string(),
);

inner_chain
.validator_monitor
.write()
.set_unaggregated_attestation(unaggregated_attestation);
}
Err(e) => {
debug!(
inner_chain.log,
"Failed to simulate attestation";
"error" => ?e
);
}
}
}
8 changes: 5 additions & 3 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3621,9 +3621,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// Allow the validator monitor to learn about a new valid state.
self.validator_monitor
.write()
.process_valid_state(current_slot.epoch(T::EthSpec::slots_per_epoch()), state);
self.validator_monitor.write().process_valid_state(
current_slot.epoch(T::EthSpec::slots_per_epoch()),
state,
&self.spec,
);

let validator_monitor = self.validator_monitor.read();

Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ where
validator_monitor.process_valid_state(
slot.epoch(TEthSpec::slots_per_epoch()),
&head_snapshot.beacon_state,
&self.spec,
);
}

Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod attestation_rewards;
pub mod attestation_simulator;
pub mod attestation_verification;
mod attester_cache;
pub mod beacon_block_reward;
Expand Down
56 changes: 56 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ use types::{BeaconState, Epoch, EthSpec, Hash256, Slot};
/// The maximum time to wait for the snapshot cache lock during a metrics scrape.
const SNAPSHOT_CACHE_TIMEOUT: Duration = Duration::from_millis(100);

// Attestation simulator metrics
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT_TOTAL: &str =
"validator_monitor_attestation_simulator_head_attester_hit_total";
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS_TOTAL: &str =
"validator_monitor_attestation_simulator_head_attester_miss_total";
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT_TOTAL: &str =
"validator_monitor_attestation_simulator_target_attester_hit_total";
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS_TOTAL: &str =
"validator_monitor_attestation_simulator_target_attester_miss_total";
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL: &str =
"validator_monitor_attestation_simulator_source_attester_hit_total";
pub const VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL: &str =
"validator_monitor_attestation_simulator_source_attester_miss_total";

lazy_static! {
/*
* Block Processing
Expand Down Expand Up @@ -1045,6 +1059,48 @@ lazy_static! {
"beacon_aggregated_attestation_subsets_total",
"Count of new aggregated attestations that are subsets of already known aggregates"
);
/*
* Attestation simulator metrics
*/
pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT: Result<IntCounter> =
try_create_int_counter(
VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_HIT_TOTAL,
"Incremented if a validator is flagged as a previous slot head attester \
during per slot processing",
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS: Result<IntCounter> =
try_create_int_counter(
VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_HEAD_ATTESTER_MISS_TOTAL,
"Incremented if a validator is not flagged as a previous slot head attester \
during per slot processing",
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT: Result<IntCounter> =
try_create_int_counter(
VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_HIT_TOTAL,
"Incremented if a validator is flagged as a previous slot target attester \
during per slot processing",
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS: Result<IntCounter> =
try_create_int_counter(
VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_TARGET_ATTESTER_MISS_TOTAL,
"Incremented if a validator is not flagged as a previous slot target attester \
during per slot processing",
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT: Result<IntCounter> =
try_create_int_counter(
VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_HIT_TOTAL,
"Incremented if a validator is flagged as a previous slot source attester \
during per slot processing",
);
pub static ref VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS: Result<IntCounter> =
try_create_int_counter(
VALIDATOR_MONITOR_ATTESTATION_SIMULATOR_SOURCE_ATTESTER_MISS_TOTAL,
"Incremented if a validator is not flagged as a previous slot source attester \
during per slot processing",
);
/*
* Missed block metrics
*/
pub static ref VALIDATOR_MONITOR_MISSED_BLOCKS_TOTAL: Result<IntCounterVec> = try_create_int_counter_vec(
"validator_monitor_missed_blocks_total",
"Number of non-finalized blocks missed",
Expand Down
Loading

0 comments on commit 189430a

Please sign in to comment.