Skip to content

Commit

Permalink
Add timing for block availability (#5510)
Browse files Browse the repository at this point in the history
* Add timing for block availability

* Attestation metrics analysis

* Prettier printing

* Add some metrics and timings to track late blocks

* Update to latest unstable

* fmt

* Merge latest unstable

* Small tweaks

* Try pushing blob timing down into verification

* Simplify for clippy
  • Loading branch information
michaelsproul authored Apr 23, 2024
1 parent 82b131d commit 72a3360
Show file tree
Hide file tree
Showing 12 changed files with 391 additions and 156 deletions.
65 changes: 33 additions & 32 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2953,7 +2953,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or erred.
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
Expand Down Expand Up @@ -2998,22 +2998,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

let block_slot = unverified_block.block().slot();

// Set observed time if not already set. Usually this should be set by gossip or RPC,
// but just in case we set it again here (useful for tests).
if let (Some(seen_timestamp), Some(current_slot)) =
(self.slot_clock.now_duration(), self.slot_clock.now())
{
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_time_observed(
block_root,
current_slot,
block_slot,
seen_timestamp,
None,
None,
);
}

let block_slot = unverified_block.block().slot();

// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
Expand All @@ -3024,6 +3022,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)?;
publish_fn()?;
let executed_block = chain.into_executed_block(execution_pending).await?;
// Record the time it took to ask the execution layer.
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_execution_time(
block_root,
block_slot,
seen_timestamp,
)
}

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block)).await
Expand Down Expand Up @@ -3090,8 +3097,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Accepts a fully-verified block and awaits on it's payload verification handle to
/// get a fully `ExecutedBlock`
/// Accepts a fully-verified block and awaits on its payload verification handle to
/// get a fully `ExecutedBlock`.
///
/// An error is returned if the verification handle couldn't be awaited.
pub async fn into_executed_block(
Expand Down Expand Up @@ -3224,10 +3231,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
match availability {
Availability::Available(block) => {
// This is the time since start of the slot where all the components of the block have become available
let delay =
get_slot_delay_ms(timestamp_now(), block.block.slot(), &self.slot_clock);
metrics::observe_duration(&metrics::BLOCK_AVAILABILITY_DELAY, delay);
// Block is fully available, import into fork choice
self.import_available_block(block).await
}
Expand Down Expand Up @@ -3256,6 +3259,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context,
} = import_data;

// Record the time at which this block's blobs became available.
if let Some(blobs_available) = block.blobs_available_timestamp() {
self.block_times_cache.write().set_time_blob_observed(
block_root,
block.slot(),
blobs_available,
);
}

// import
let chain = self.clone();
let block_root = self
Expand Down Expand Up @@ -3396,6 +3408,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Early attester cache insert failed";
"error" => ?e
);
} else {
let attestable_timestamp =
self.slot_clock.now_duration().unwrap_or_default();
self.block_times_cache.write().set_time_attestable(
block_root,
signed_block.slot(),
attestable_timestamp,
)
}
} else {
warn!(
Expand Down Expand Up @@ -3885,25 +3905,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

// Do not store metrics if the block was > 4 slots old, this helps prevent noise during
// sync.
if block_delay_total < self.slot_clock.slot_duration() * 4 {
// Observe the delay between when we observed the block and when we imported it.
let block_delays = self.block_times_cache.read().get_block_delays(
block_root,
self.slot_clock
.start_of(current_slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);

metrics::observe_duration(
&metrics::BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME,
block_delays
.imported
.unwrap_or_else(|| Duration::from_secs(0)),
);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
Expand Down
42 changes: 34 additions & 8 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use merkle_proof::MerkleTreeError;
use slog::{debug, warn};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
Expand Down Expand Up @@ -214,7 +215,10 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn __assumed_valid(blob: Arc<BlobSidecar<T::EthSpec>>) -> Self {
Self {
block_root: blob.block_root(),
blob: KzgVerifiedBlob { blob },
blob: KzgVerifiedBlob {
blob,
seen_timestamp: Duration::from_secs(0),
},
}
}
pub fn id(&self) -> BlobIdentifier {
Expand Down Expand Up @@ -260,6 +264,8 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
#[ssz(struct_behaviour = "transparent")]
pub struct KzgVerifiedBlob<E: EthSpec> {
blob: Arc<BlobSidecar<E>>,
#[ssz(skip_serializing, skip_deserializing)]
seen_timestamp: Duration,
}

impl<E: EthSpec> PartialOrd for KzgVerifiedBlob<E> {
Expand All @@ -275,8 +281,12 @@ impl<E: EthSpec> Ord for KzgVerifiedBlob<E> {
}

impl<E: EthSpec> KzgVerifiedBlob<E> {
pub fn new(blob: Arc<BlobSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
verify_kzg_for_blob(blob, kzg)
pub fn new(
blob: Arc<BlobSidecar<E>>,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<Self, KzgError> {
verify_kzg_for_blob(blob, kzg, seen_timestamp)
}
pub fn to_blob(self) -> Arc<BlobSidecar<E>> {
self.blob
Expand All @@ -294,12 +304,18 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
pub fn blob_index(&self) -> u64 {
self.blob.index
}
pub fn seen_timestamp(&self) -> Duration {
self.seen_timestamp
}
/// Construct a `KzgVerifiedBlob` that is assumed to be valid.
///
/// This should ONLY be used for testing.
#[cfg(test)]
pub fn __assumed_valid(blob: Arc<BlobSidecar<E>>) -> Self {
Self { blob }
Self {
blob,
seen_timestamp: Duration::from_secs(0),
}
}
}

Expand All @@ -309,9 +325,13 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
pub fn verify_kzg_for_blob<E: EthSpec>(
blob: Arc<BlobSidecar<E>>,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<KzgVerifiedBlob<E>, KzgError> {
validate_blob::<E>(kzg, &blob.blob, blob.kzg_commitment, blob.kzg_proof)?;
Ok(KzgVerifiedBlob { blob })
Ok(KzgVerifiedBlob {
blob,
seen_timestamp,
})
}

pub struct KzgVerifiedBlobList<E: EthSpec> {
Expand All @@ -322,13 +342,17 @@ impl<E: EthSpec> KzgVerifiedBlobList<E> {
pub fn new<I: IntoIterator<Item = Arc<BlobSidecar<E>>>>(
blob_list: I,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<Self, KzgError> {
let blobs = blob_list.into_iter().collect::<Vec<_>>();
verify_kzg_for_blob_list(blobs.iter(), kzg)?;
Ok(Self {
verified_blobs: blobs
.into_iter()
.map(|blob| KzgVerifiedBlob { blob })
.map(|blob| KzgVerifiedBlob {
blob,
seen_timestamp,
})
.collect(),
})
}
Expand Down Expand Up @@ -374,6 +398,8 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
let blob_epoch = blob_slot.epoch(T::EthSpec::slots_per_epoch());
let signed_block_header = &blob_sidecar.signed_block_header;

let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default();

// This condition is not possible if we have received the blob from the network
// since we only subscribe to `MaxBlobsPerBlock` subnets over gossip network.
// We include this check only for completeness.
Expand Down Expand Up @@ -641,8 +667,8 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
.kzg
.as_ref()
.ok_or(GossipBlobError::KzgNotInitialized)?;
let kzg_verified_blob =
KzgVerifiedBlob::new(blob_sidecar, kzg).map_err(GossipBlobError::KzgError)?;
let kzg_verified_blob = KzgVerifiedBlob::new(blob_sidecar, kzg, seen_timestamp)
.map_err(GossipBlobError::KzgError)?;

Ok(GossipVerifiedBlob {
block_root,
Expand Down
89 changes: 88 additions & 1 deletion beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,35 @@ type BlockRoot = Hash256;
#[derive(Clone, Default)]
pub struct Timestamps {
pub observed: Option<Duration>,
pub all_blobs_observed: Option<Duration>,
pub execution_time: Option<Duration>,
pub attestable: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
}

// Helps arrange delay data so it is more relevant to metrics.
#[derive(Debug, Default)]
pub struct BlockDelays {
/// Time after start of slot we saw the block.
pub observed: Option<Duration>,
/// The time after the start of the slot we saw all blobs.
pub all_blobs_observed: Option<Duration>,
/// The time it took to get verification from the EL for the block.
pub execution_time: Option<Duration>,
/// The delay from the start of the slot before the block became available
///
/// Equal to max(`observed + execution_time`, `all_blobs_observed`).
pub available: Option<Duration>,
/// Time after `available`.
pub attestable: Option<Duration>,
/// Time
/// ALSO time after `available`.
///
/// We need to use `available` again rather than `attestable` to handle the case where the block
/// does not get added to the early-attester cache.
pub imported: Option<Duration>,
/// Time after `imported`.
pub set_as_head: Option<Duration>,
}

Expand All @@ -35,14 +55,34 @@ impl BlockDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let all_blobs_observed = times
.all_blobs_observed
.and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time));
let execution_time = times
.execution_time
.and_then(|execution_time| execution_time.checked_sub(times.observed?));
// Duration since UNIX epoch at which block became available.
let available_time = times.execution_time.map(|execution_time| {
std::cmp::max(execution_time, times.all_blobs_observed.unwrap_or_default())
});
// Duration from the start of the slot until the block became available.
let available_delay =
available_time.and_then(|available_time| available_time.checked_sub(slot_start_time));
let attestable = times
.attestable
.and_then(|attestable_time| attestable_time.checked_sub(slot_start_time));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(times.observed?));
.and_then(|imported_time| imported_time.checked_sub(available_time?));
let set_as_head = times
.set_as_head
.and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?));
BlockDelays {
observed,
all_blobs_observed,
execution_time,
available: available_delay,
attestable,
imported,
set_as_head,
}
Expand Down Expand Up @@ -109,6 +149,53 @@ impl BlockTimesCache {
}
}

pub fn set_time_blob_observed(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.all_blobs_observed
.map_or(true, |prev| timestamp > prev)
{
block_times.timestamps.all_blobs_observed = Some(timestamp);
}
}

pub fn set_execution_time(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.execution_time
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.execution_time = Some(timestamp);
}
}

pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.attestable
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.attestable = Some(timestamp);
}
}

pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ type PayloadVerificationHandle<E> =
/// - Parent is known
/// - Signatures
/// - State root check
/// - Per block processing
/// - Blobs sidecar has been validated if present
/// - Block processing
///
/// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid
/// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the
Expand Down
Loading

0 comments on commit 72a3360

Please sign in to comment.