Skip to content

Commit

Permalink
Move processing cache out of DA (#5420)
Browse files Browse the repository at this point in the history
* Move processing cache out of DA

* Merge branch 'sigp/unstable' into non-da-processing-cach

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into non-da-processing-cache

* remove unused file, remove outdated TODO, add is_deneb check to missing blob id calculations

* Merge branch 'unstable' of https://github.com/sigp/lighthouse into non-da-processing-cache

* fix lints
  • Loading branch information
dapplion authored Apr 10, 2024
1 parent b1f9751 commit 30dc260
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 209 deletions.
11 changes: 8 additions & 3 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, Logger};
use std::collections::HashMap;
Expand Down Expand Up @@ -412,8 +412,13 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
if self.check_caches == CheckCaches::Yes {
self.beacon_chain
.data_availability_checker
.get_block(&root)
.reqresp_pre_import_cache
.read()
.get(&root)
.map(|block| {
metrics::inc_counter(&metrics::BEACON_REQRESP_PRE_IMPORT_CACHE_HITS);
block.clone()
})
.or(self.beacon_chain.early_attester_cache.get_block(root))
} else {
None
Expand Down
17 changes: 10 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ pub type BeaconStore<T> = Arc<
>,
>;

/// Cache gossip verified blocks to serve over ReqResp before they are imported
type ReqRespPreImportCache<E> = HashMap<Hash256, Arc<SignedBeaconBlock<E>>>;

/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
/// operations and chooses a canonical head.
pub struct BeaconChain<T: BeaconChainTypes> {
Expand Down Expand Up @@ -461,6 +464,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) attester_cache: Arc<AttesterCache>,
/// A cache used when producing attestations whilst the head block is still being imported.
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
/// Cache gossip verified blocks to serve over ReqResp before they are imported
pub reqresp_pre_import_cache: Arc<RwLock<ReqRespPreImportCache<T::EthSpec>>>,
/// A cache used to keep track of various block timings.
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
/// A cache used to track pre-finalization block roots for quick rejection.
Expand Down Expand Up @@ -2898,8 +2903,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.data_availability_checker
.notify_gossip_blob(block_root, &blob);
let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
}
Expand Down Expand Up @@ -2932,8 +2935,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

self.data_availability_checker
.notify_rpc_blobs(block_root, &blobs);
let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
.await;
Expand All @@ -2950,7 +2951,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.data_availability_checker.remove_notified(block_root);
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}
Expand All @@ -2963,8 +2964,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.data_availability_checker
.notify_block(block_root, unverified_block.block_cloned());
self.reqresp_pre_import_cache
.write()
.insert(block_root, unverified_block.block_cloned());

let r = self
.process_block(block_root, unverified_block, notify_execution_layer, || {
Ok(())
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ where
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
attester_cache: <_>::default(),
early_attester_cache: <_>::default(),
reqresp_pre_import_cache: <_>::default(),
light_client_server_cache: LightClientServerCache::new(),
light_client_server_tx: self.light_client_server_tx,
shutdown_sender: self
Expand Down
98 changes: 24 additions & 74 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ pub use crate::data_availability_checker::availability_view::{
};
pub use crate::data_availability_checker::child_components::ChildComponents;
use crate::data_availability_checker::overflow_lru_cache::OverflowLRUCache;
use crate::data_availability_checker::processing_cache::ProcessingCache;
use crate::{BeaconChain, BeaconChainTypes, BeaconStore};
use kzg::Kzg;
use parking_lot::RwLock;
pub use processing_cache::ProcessingComponents;
use slasher::test_utils::E;
use slog::{debug, error, Logger};
use slot_clock::SlotClock;
Expand All @@ -21,15 +18,13 @@ use std::fmt::Debug;
use std::num::NonZeroUsize;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::beacon_block_body::KzgCommitmentOpts;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};

mod availability_view;
mod child_components;
mod error;
mod overflow_lru_cache;
mod processing_cache;
mod state_lru_cache;

pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
Expand All @@ -50,7 +45,6 @@ pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();
/// `DataAvailabilityChecker` is responsible for KZG verification of block components as well as
/// checking whether a "availability check" is required at all.
pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
processing_cache: RwLock<ProcessingCache<T::EthSpec>>,
availability_cache: Arc<OverflowLRUCache<T>>,
slot_clock: T::SlotClock,
kzg: Option<Arc<Kzg>>,
Expand Down Expand Up @@ -89,7 +83,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Result<Self, AvailabilityCheckError> {
let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?;
Ok(Self {
processing_cache: <_>::default(),
availability_cache: Arc::new(overflow_cache),
slot_clock,
log: log.clone(),
Expand All @@ -98,25 +91,33 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Checks if the given block root is cached.
/// Checks if the block root is currenlty in the availability cache awaiting processing because
/// of missing components.
pub fn has_block(&self, block_root: &Hash256) -> bool {
self.processing_cache.read().has_block(block_root)
self.availability_cache.has_block(block_root)
}

/// Get the processing info for a block.
pub fn get_processing_components(
&self,
block_root: Hash256,
) -> Option<ProcessingComponents<T::EthSpec>> {
self.processing_cache.read().get(&block_root).cloned()
pub fn get_missing_blob_ids_with(&self, block_root: Hash256) -> MissingBlobs {
self.availability_cache
.with_pending_components(&block_root, |pending_components| match pending_components {
Some(pending_components) => self.get_missing_blob_ids(
block_root,
pending_components
.get_cached_block()
.as_ref()
.map(|b| b.as_block()),
&pending_components.verified_blobs,
),
None => MissingBlobs::new_without_block(block_root, self.is_deneb()),
})
}

/// If there's no block, all possible ids will be returned that don't exist in the given blobs.
/// If there no blobs, all possible ids will be returned.
pub fn get_missing_blob_ids<V>(
&self,
block_root: Hash256,
block: &Option<Arc<SignedBeaconBlock<T::EthSpec>>>,
block: Option<&SignedBeaconBlock<T::EthSpec>>,
blobs: &FixedVector<Option<V>, <T::EthSpec as EthSpec>::MaxBlobsPerBlock>,
) -> MissingBlobs {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
Expand All @@ -132,10 +133,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
if self.da_check_required_for_epoch(current_epoch) {
match block {
Some(cached_block) => {
let block_commitments = cached_block.get_commitments();
let block_commitments_len = cached_block
.message()
.body()
.blob_kzg_commitments()
.map(|v| v.len())
.unwrap_or(0);
let blob_ids = blobs
.iter()
.take(block_commitments.len())
.take(block_commitments_len)
.enumerate()
.filter_map(|(index, blob_commitment_opt)| {
blob_commitment_opt.is_none().then_some(BlobIdentifier {
Expand Down Expand Up @@ -163,14 +169,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.peek_blob(blob_id)
}

/// Get a block from the availability cache. Includes any blocks we are currently processing.
pub fn get_block(&self, block_root: &Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
self.processing_cache
.read()
.get(block_root)
.and_then(|cached| cached.block.clone())
}

/// Put a list of blobs received via RPC into the availability cache. This performs KZG
/// verification on the blobs in the list.
pub fn put_rpc_blobs(
Expand Down Expand Up @@ -323,52 +321,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block.num_expected_blobs() > 0 && self.da_check_required_for_epoch(block.epoch())
}

/// Adds a block to the processing cache. This block's commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blocks, as well as understanding
/// our blob download requirements. We will also serve this over RPC.
pub fn notify_block(&self, block_root: Hash256, block: Arc<SignedBeaconBlock<T::EthSpec>>) {
self.processing_cache
.write()
.entry(block_root)
.or_default()
.merge_block(block);
}

/// Add a single blob commitment to the processing cache. This commitment is unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
pub fn notify_gossip_blob(&self, block_root: Hash256, blob: &GossipVerifiedBlob<T>) {
let index = blob.index();
let commitment = blob.kzg_commitment();
self.processing_cache
.write()
.entry(block_root)
.or_default()
.merge_single_blob(index as usize, commitment);
}

/// Adds blob commitments to the processing cache. These commitments are unverified but caching
/// them here is useful to avoid duplicate downloads of blobs, as well as understanding
/// our block and blob download requirements.
pub fn notify_rpc_blobs(&self, block_root: Hash256, blobs: &FixedBlobSidecarList<T::EthSpec>) {
let mut commitments = KzgCommitmentOpts::<T::EthSpec>::default();
for blob in blobs.iter().flatten() {
if let Some(commitment) = commitments.get_mut(blob.index as usize) {
*commitment = Some(blob.kzg_commitment);
}
}
self.processing_cache
.write()
.entry(block_root)
.or_default()
.merge_blobs(commitments);
}

/// Clears the block and all blobs from the processing cache for a give root if they exist.
pub fn remove_notified(&self, block_root: &Hash256) {
self.processing_cache.write().remove(block_root)
}

/// The epoch at which we require a data availability check in block processing.
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
Expand Down Expand Up @@ -410,7 +362,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// Collects metrics from the data availability checker.
pub fn metrics(&self) -> DataAvailabilityCheckerMetrics {
DataAvailabilityCheckerMetrics {
processing_cache_size: self.processing_cache.read().len(),
num_store_entries: self.availability_cache.num_store_entries(),
state_cache_size: self.availability_cache.state_cache_size(),
block_cache_size: self.availability_cache.block_cache_size(),
Expand All @@ -420,7 +371,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

/// Helper struct to group data availability checker metrics.
pub struct DataAvailabilityCheckerMetrics {
pub processing_cache_size: usize,
pub num_store_entries: usize,
pub state_cache_size: usize,
pub block_cache_size: usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::state_lru_cache::DietAvailabilityPendingExecutedBlock;
use crate::blob_verification::KzgVerifiedBlob;
use crate::block_verification_types::AsBlock;
use crate::data_availability_checker::overflow_lru_cache::PendingComponents;
use crate::data_availability_checker::ProcessingComponents;
use kzg::KzgCommitment;
use ssz_types::FixedVector;
use std::sync::Arc;
Expand Down Expand Up @@ -178,14 +177,6 @@ macro_rules! impl_availability_view {
};
}

impl_availability_view!(
ProcessingComponents,
Arc<SignedBeaconBlock<E>>,
KzgCommitment,
block,
blob_commitments
);

impl_availability_view!(
PendingComponents,
DietAvailabilityPendingExecutedBlock<E>,
Expand Down Expand Up @@ -293,32 +284,6 @@ pub mod tests {
(block, blobs, invalid_blobs)
}

type ProcessingViewSetup<E> = (
Arc<SignedBeaconBlock<E>>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
FixedVector<Option<KzgCommitment>, <E as EthSpec>::MaxBlobsPerBlock>,
);

pub fn setup_processing_components(
block: SignedBeaconBlock<E>,
valid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
invalid_blobs: FixedVector<Option<Arc<BlobSidecar<E>>>, <E as EthSpec>::MaxBlobsPerBlock>,
) -> ProcessingViewSetup<E> {
let blobs = FixedVector::from(
valid_blobs
.iter()
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
let invalid_blobs = FixedVector::from(
invalid_blobs
.iter()
.map(|blob_opt| blob_opt.as_ref().map(|blob| blob.kzg_commitment))
.collect::<Vec<_>>(),
);
(Arc::new(block), blobs, invalid_blobs)
}

type PendingComponentsSetup<E> = (
DietAvailabilityPendingExecutedBlock<E>,
FixedVector<Option<KzgVerifiedBlob<E>>, <E as EthSpec>::MaxBlobsPerBlock>,
Expand Down Expand Up @@ -490,13 +455,6 @@ pub mod tests {
};
}

generate_tests!(
processing_components_tests,
ProcessingComponents::<E>,
kzg_commitments,
processing_blobs,
setup_processing_components
);
generate_tests!(
pending_components_tests,
PendingComponents<E>,
Expand Down
Loading

0 comments on commit 30dc260

Please sign in to comment.