Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timing for block availability #5510

Merged
merged 10 commits into from
Apr 23, 2024
Merged
65 changes: 33 additions & 32 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2953,7 +2953,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or erred.
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
Expand Down Expand Up @@ -2998,22 +2998,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

let block_slot = unverified_block.block().slot();

// Set observed time if not already set. Usually this should be set by gossip or RPC,
// but just in case we set it again here (useful for tests).
if let (Some(seen_timestamp), Some(current_slot)) =
(self.slot_clock.now_duration(), self.slot_clock.now())
{
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_time_observed(
block_root,
current_slot,
block_slot,
seen_timestamp,
None,
None,
);
}

let block_slot = unverified_block.block().slot();

// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
Expand All @@ -3024,6 +3022,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)?;
publish_fn()?;
let executed_block = chain.into_executed_block(execution_pending).await?;
// Record the time it took to ask the execution layer.
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
self.block_times_cache.write().set_execution_time(
block_root,
block_slot,
seen_timestamp,
)
}

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block)).await
Expand Down Expand Up @@ -3090,8 +3097,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Accepts a fully-verified block and awaits on it's payload verification handle to
/// get a fully `ExecutedBlock`
/// Accepts a fully-verified block and awaits on its payload verification handle to
/// get a fully `ExecutedBlock`.
///
/// An error is returned if the verification handle couldn't be awaited.
pub async fn into_executed_block(
Expand Down Expand Up @@ -3224,10 +3231,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
match availability {
Availability::Available(block) => {
// This is the time since start of the slot where all the components of the block have become available
let delay =
get_slot_delay_ms(timestamp_now(), block.block.slot(), &self.slot_clock);
metrics::observe_duration(&metrics::BLOCK_AVAILABILITY_DELAY, delay);
// Block is fully available, import into fork choice
self.import_available_block(block).await
}
Expand Down Expand Up @@ -3256,6 +3259,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context,
} = import_data;

// Record the time at which this block's blobs became available.
if let Some(blobs_available) = block.blobs_available_timestamp() {
self.block_times_cache.write().set_time_blob_observed(
block_root,
block.slot(),
blobs_available,
);
}

// import
let chain = self.clone();
let block_root = self
Expand Down Expand Up @@ -3396,6 +3408,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"Early attester cache insert failed";
"error" => ?e
);
} else {
let attestable_timestamp =
self.slot_clock.now_duration().unwrap_or_default();
self.block_times_cache.write().set_time_attestable(
block_root,
signed_block.slot(),
attestable_timestamp,
)
}
} else {
warn!(
Expand Down Expand Up @@ -3885,25 +3905,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

// Do not store metrics if the block was > 4 slots old, this helps prevent noise during
// sync.
if block_delay_total < self.slot_clock.slot_duration() * 4 {
// Observe the delay between when we observed the block and when we imported it.
let block_delays = self.block_times_cache.read().get_block_delays(
block_root,
self.slot_clock
.start_of(current_slot)
.unwrap_or_else(|| Duration::from_secs(0)),
);

metrics::observe_duration(
&metrics::BEACON_BLOCK_IMPORTED_OBSERVED_DELAY_TIME,
block_delays
.imported
.unwrap_or_else(|| Duration::from_secs(0)),
);
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
Expand Down
42 changes: 34 additions & 8 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use merkle_proof::MerkleTreeError;
use slog::{debug, warn};
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
Expand Down Expand Up @@ -214,7 +215,10 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
pub fn __assumed_valid(blob: Arc<BlobSidecar<T::EthSpec>>) -> Self {
Self {
block_root: blob.block_root(),
blob: KzgVerifiedBlob { blob },
blob: KzgVerifiedBlob {
blob,
seen_timestamp: Duration::from_secs(0),
},
}
}
pub fn id(&self) -> BlobIdentifier {
Expand Down Expand Up @@ -260,6 +264,8 @@ impl<T: BeaconChainTypes> GossipVerifiedBlob<T> {
#[ssz(struct_behaviour = "transparent")]
pub struct KzgVerifiedBlob<E: EthSpec> {
blob: Arc<BlobSidecar<E>>,
#[ssz(skip_serializing, skip_deserializing)]
seen_timestamp: Duration,
}

impl<E: EthSpec> PartialOrd for KzgVerifiedBlob<E> {
Expand All @@ -275,8 +281,12 @@ impl<E: EthSpec> Ord for KzgVerifiedBlob<E> {
}

impl<E: EthSpec> KzgVerifiedBlob<E> {
pub fn new(blob: Arc<BlobSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
verify_kzg_for_blob(blob, kzg)
pub fn new(
blob: Arc<BlobSidecar<E>>,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<Self, KzgError> {
verify_kzg_for_blob(blob, kzg, seen_timestamp)
}
pub fn to_blob(self) -> Arc<BlobSidecar<E>> {
self.blob
Expand All @@ -294,12 +304,18 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
pub fn blob_index(&self) -> u64 {
self.blob.index
}
pub fn seen_timestamp(&self) -> Duration {
self.seen_timestamp
}
/// Construct a `KzgVerifiedBlob` that is assumed to be valid.
///
/// This should ONLY be used for testing.
#[cfg(test)]
pub fn __assumed_valid(blob: Arc<BlobSidecar<E>>) -> Self {
Self { blob }
Self {
blob,
seen_timestamp: Duration::from_secs(0),
}
}
}

Expand All @@ -309,9 +325,13 @@ impl<E: EthSpec> KzgVerifiedBlob<E> {
pub fn verify_kzg_for_blob<E: EthSpec>(
blob: Arc<BlobSidecar<E>>,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<KzgVerifiedBlob<E>, KzgError> {
validate_blob::<E>(kzg, &blob.blob, blob.kzg_commitment, blob.kzg_proof)?;
Ok(KzgVerifiedBlob { blob })
Ok(KzgVerifiedBlob {
blob,
seen_timestamp,
})
}

pub struct KzgVerifiedBlobList<E: EthSpec> {
Expand All @@ -322,13 +342,17 @@ impl<E: EthSpec> KzgVerifiedBlobList<E> {
pub fn new<I: IntoIterator<Item = Arc<BlobSidecar<E>>>>(
blob_list: I,
kzg: &Kzg,
seen_timestamp: Duration,
) -> Result<Self, KzgError> {
let blobs = blob_list.into_iter().collect::<Vec<_>>();
verify_kzg_for_blob_list(blobs.iter(), kzg)?;
Ok(Self {
verified_blobs: blobs
.into_iter()
.map(|blob| KzgVerifiedBlob { blob })
.map(|blob| KzgVerifiedBlob {
blob,
seen_timestamp,
})
.collect(),
})
}
Expand Down Expand Up @@ -374,6 +398,8 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
let blob_epoch = blob_slot.epoch(T::EthSpec::slots_per_epoch());
let signed_block_header = &blob_sidecar.signed_block_header;

let seen_timestamp = chain.slot_clock.now_duration().unwrap_or_default();

// This condition is not possible if we have received the blob from the network
// since we only subscribe to `MaxBlobsPerBlock` subnets over gossip network.
// We include this check only for completeness.
Expand Down Expand Up @@ -641,8 +667,8 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
.kzg
.as_ref()
.ok_or(GossipBlobError::KzgNotInitialized)?;
let kzg_verified_blob =
KzgVerifiedBlob::new(blob_sidecar, kzg).map_err(GossipBlobError::KzgError)?;
let kzg_verified_blob = KzgVerifiedBlob::new(blob_sidecar, kzg, seen_timestamp)
.map_err(GossipBlobError::KzgError)?;

Ok(GossipVerifiedBlob {
block_root,
Expand Down
89 changes: 88 additions & 1 deletion beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,35 @@ type BlockRoot = Hash256;
#[derive(Clone, Default)]
pub struct Timestamps {
pub observed: Option<Duration>,
pub all_blobs_observed: Option<Duration>,
pub execution_time: Option<Duration>,
pub attestable: Option<Duration>,
pub imported: Option<Duration>,
pub set_as_head: Option<Duration>,
}

// Helps arrange delay data so it is more relevant to metrics.
#[derive(Debug, Default)]
pub struct BlockDelays {
/// Time after start of slot we saw the block.
pub observed: Option<Duration>,
/// The time after the start of the slot we saw all blobs.
pub all_blobs_observed: Option<Duration>,
/// The time it took to get verification from the EL for the block.
pub execution_time: Option<Duration>,
/// The delay from the start of the slot before the block became available
///
/// Equal to max(`observed + execution_time`, `all_blobs_observed`).
pub available: Option<Duration>,
/// Time after `available`.
pub attestable: Option<Duration>,
/// Time
/// ALSO time after `available`.
///
/// We need to use `available` again rather than `attestable` to handle the case where the block
/// does not get added to the early-attester cache.
pub imported: Option<Duration>,
/// Time after `imported`.
pub set_as_head: Option<Duration>,
}

Expand All @@ -35,14 +55,34 @@ impl BlockDelays {
let observed = times
.observed
.and_then(|observed_time| observed_time.checked_sub(slot_start_time));
let all_blobs_observed = times
.all_blobs_observed
.and_then(|all_blobs_observed| all_blobs_observed.checked_sub(slot_start_time));
let execution_time = times
.execution_time
.and_then(|execution_time| execution_time.checked_sub(times.observed?));
// Duration since UNIX epoch at which block became available.
let available_time = times.execution_time.map(|execution_time| {
std::cmp::max(execution_time, times.all_blobs_observed.unwrap_or_default())
});
// Duration from the start of the slot until the block became available.
let available_delay =
available_time.and_then(|available_time| available_time.checked_sub(slot_start_time));
let attestable = times
.attestable
.and_then(|attestable_time| attestable_time.checked_sub(slot_start_time));
let imported = times
.imported
.and_then(|imported_time| imported_time.checked_sub(times.observed?));
.and_then(|imported_time| imported_time.checked_sub(available_time?));
let set_as_head = times
.set_as_head
.and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?));
BlockDelays {
observed,
all_blobs_observed,
execution_time,
available: available_delay,
attestable,
imported,
set_as_head,
}
Expand Down Expand Up @@ -109,6 +149,53 @@ impl BlockTimesCache {
}
}

pub fn set_time_blob_observed(
&mut self,
block_root: BlockRoot,
slot: Slot,
timestamp: Duration,
) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.all_blobs_observed
.map_or(true, |prev| timestamp > prev)
{
block_times.timestamps.all_blobs_observed = Some(timestamp);
}
}

pub fn set_execution_time(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.execution_time
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.execution_time = Some(timestamp);
}
}

pub fn set_time_attestable(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
.entry(block_root)
.or_insert_with(|| BlockTimesCacheValue::new(slot));
if block_times
.timestamps
.attestable
.map_or(true, |prev| timestamp < prev)
{
block_times.timestamps.attestable = Some(timestamp);
}
}

pub fn set_time_imported(&mut self, block_root: BlockRoot, slot: Slot, timestamp: Duration) {
let block_times = self
.cache
Expand Down
3 changes: 1 addition & 2 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ type PayloadVerificationHandle<E> =
/// - Parent is known
/// - Signatures
/// - State root check
/// - Per block processing
/// - Blobs sidecar has been validated if present
/// - Block processing
///
/// Note: a `ExecutionPendingBlock` is not _forever_ valid to be imported, it may later become invalid
/// due to finality or some other event. A `ExecutionPendingBlock` should be imported into the
Expand Down
Loading
Loading