Skip to content

Commit

Permalink
Add custody requirement checks to availability_cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Apr 16, 2024
1 parent 898c249 commit 54c4afc
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 93 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let overflow_cache = OverflowLRUCache::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
Some(custody_column_count),
spec.clone(),
)?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256};
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
pub verified_data_columns: FixedVector<Option<KzgVerifiedDataColumn<E>>, E::DataColumnCount>,
pub verified_data_columns: VariableList<KzgVerifiedDataColumn<E>, E::DataColumnCount>,
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}

Expand All @@ -73,10 +73,20 @@ impl<E: EthSpec> PendingComponents<E> {
&self.verified_blobs
}

/// Returns an immutable reference to the fixed vector of cached data columns.
/// Returns a mutable reference to the cached data column.
pub fn get_cached_data_column(
&self,
data_column_index: u64,
) -> Option<&KzgVerifiedDataColumn<E>> {
self.verified_data_columns
.iter()
.find(|d| d.data_column_index() == data_column_index)
}

/// Returns an immutable reference to the list of cached data columns.
pub fn get_cached_data_columns(
&self,
) -> &FixedVector<Option<KzgVerifiedDataColumn<E>>, E::DataColumnCount> {
) -> &VariableList<KzgVerifiedDataColumn<E>, E::DataColumnCount> {
&self.verified_data_columns
}

Expand All @@ -95,7 +105,7 @@ impl<E: EthSpec> PendingComponents<E> {
/// Returns a mutable reference to the fixed vector of cached data columns.
pub fn get_cached_data_columns_mut(
&mut self,
) -> &mut FixedVector<Option<KzgVerifiedDataColumn<E>>, E::DataColumnCount> {
) -> &mut VariableList<KzgVerifiedDataColumn<E>, E::DataColumnCount> {
&mut self.verified_data_columns
}

Expand All @@ -111,16 +121,15 @@ impl<E: EthSpec> PendingComponents<E> {
.unwrap_or(false)
}

/// Checks if a data column exists at the given index in the cache.
/// Checks if a data column of a given index exists in the cache.
///
/// Returns:
/// - `true` if a data column exists at the given index.
/// - `true` if a data column for the given index exists.
/// - `false` otherwise.
fn data_column_exists(&self, data_colum_index: usize) -> bool {
fn data_column_exists(&self, data_colum_index: u64) -> bool {
self.get_cached_data_columns()
.get(data_colum_index)
.map(|d| d.is_some())
.unwrap_or(false)
.iter()
.any(|d| d.data_column_index() == data_colum_index)
}

/// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a
Expand All @@ -138,6 +147,11 @@ impl<E: EthSpec> PendingComponents<E> {
self.get_cached_blobs().iter().flatten().count()
}

/// Returns the number of data columns that have been received and are stored in the cache.
pub fn num_received_data_columns(&self) -> usize {
self.get_cached_data_columns().iter().count()
}

/// Inserts a block into the cache.
pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock<E>) {
*self.get_cached_block_mut() = Some(block)
Expand Down Expand Up @@ -185,40 +199,18 @@ impl<E: EthSpec> PendingComponents<E> {
}
}

/// Inserts a data column at a specific index in the cache.
///
/// Existing data column at the index will be replaced.
fn insert_data_column_at_index(
&mut self,
data_column_index: usize,
data_column: KzgVerifiedDataColumn<E>,
) {
if let Some(b) = self
.get_cached_data_columns_mut()
.get_mut(data_column_index)
{
*b = Some(data_column);
}
}

/// Merges a given set of data columns into the cache.
///
/// Data columns are only inserted if:
/// 1. The data column entry at the index is empty and no block exists.
/// 2. The block exists and its commitments matches the data column's commitments.
fn merge_data_columns(
fn merge_data_columns<I: IntoIterator<Item = KzgVerifiedDataColumn<E>>>(
&mut self,
data_columns: FixedVector<Option<KzgVerifiedDataColumn<E>>, E::DataColumnCount>,
) {
for (index, data_column) in data_columns.iter().cloned().enumerate() {
let Some(data_column) = data_column else {
continue;
};
kzg_verified_data_columns: I,
) -> Result<(), AvailabilityCheckError> {
for data_column in kzg_verified_data_columns {
// TODO(das): Add equivalent checks for data columns if necessary
if !self.data_column_exists(index) {
self.insert_data_column_at_index(index, data_column)
if !self.data_column_exists(data_column.data_column_index()) {
self.verified_data_columns.push(data_column)?;
}
}
Ok(())
}

/// Inserts a new block and revalidates the existing blobs against it.
Expand All @@ -230,13 +222,24 @@ impl<E: EthSpec> PendingComponents<E> {
self.merge_blobs(reinsert);
}

/// Checks if the block and all of its expected blobs are available in the cache.
/// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are
/// available in the cache.
///
/// Returns `true` if both the block exists and the number of received blobs matches the number
/// of expected blobs.
pub fn is_available(&self) -> bool {
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, custody_column_count: Option<usize>) -> bool {
if let Some(num_expected_blobs) = self.num_expected_blobs() {
// We don't expect any data columns if there's no blobs
let num_expected_data_columns = if num_expected_blobs > 0 {
custody_column_count.unwrap_or(0)
} else {
0
};

// TODO(das): migrate from checking blob count to checking only column count post
// EIP-7594 fork.
num_expected_blobs == self.num_received_blobs()
&& num_expected_data_columns == self.num_received_data_columns()
} else {
false
}
Expand All @@ -247,7 +250,7 @@ impl<E: EthSpec> PendingComponents<E> {
Self {
block_root,
verified_blobs: FixedVector::default(),
verified_data_columns: FixedVector::default(),
verified_data_columns: VariableList::default(),
executed_block: None,
}
}
Expand Down Expand Up @@ -289,8 +292,7 @@ impl<E: EthSpec> PendingComponents<E> {
// TODO(das) Do we need a check here for number of expected custody columns?
let verified_data_columns = verified_data_columns
.into_iter()
.cloned()
.filter_map(|d| d.map(|d| d.to_data_column()))
.map(|d| d.to_data_column())
.collect::<Vec<_>>()
.into();

Expand Down Expand Up @@ -329,16 +331,15 @@ impl<E: EthSpec> PendingComponents<E> {
});
}
}
for maybe_data_column in self.verified_data_columns.iter() {
if maybe_data_column.is_some() {
return maybe_data_column.as_ref().map(|kzg_verified_data_column| {
kzg_verified_data_column
.as_data_column()
.slot()
.epoch(E::slots_per_epoch())
});
}

if let Some(kzg_verified_data_column) = self.verified_data_columns.first() {
let epoch = kzg_verified_data_column
.as_data_column()
.slot()
.epoch(E::slots_per_epoch());
return Some(epoch);
}

None
})
}
Expand Down Expand Up @@ -427,10 +428,7 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
.put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())?
}

for data_column in Vec::from(pending_components.verified_data_columns)
.into_iter()
.flatten()
{
for data_column in pending_components.verified_data_columns.into_iter() {
let key = OverflowKey::from_data_column_id::<T::EthSpec>(DataColumnIdentifier {
block_root,
index: data_column.data_column_index(),
Expand Down Expand Up @@ -476,15 +474,14 @@ impl<T: BeaconChainTypes> OverflowStore<T> {
.ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? =
Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?);
}
OverflowKey::DataColumn(_, index) => {
*maybe_pending_components
// TODO(das): Remove unused index
OverflowKey::DataColumn(_, _index) => {
let data_column =
KzgVerifiedDataColumn::from_ssz_bytes(value_bytes.as_slice())?;
maybe_pending_components
.get_or_insert_with(|| PendingComponents::empty(block_root))
.verified_data_columns
.get_mut(index as usize)
.ok_or(AvailabilityCheckError::DataColumnIndexInvalid(index as u64))? =
Some(KzgVerifiedDataColumn::from_ssz_bytes(
value_bytes.as_slice(),
)?);
.get_cached_data_columns_mut()
.push(data_column)?;
}
}
}
Expand Down Expand Up @@ -601,12 +598,7 @@ impl<T: BeaconChainTypes> Critical<T> {
) -> Result<Option<Arc<DataColumnSidecar<T::EthSpec>>>, AvailabilityCheckError> {
if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) {
Ok(pending_components
.verified_data_columns
.get(data_column_id.index as usize)
.ok_or(AvailabilityCheckError::DataColumnIndexInvalid(
data_column_id.index,
))?
.as_ref()
.get_cached_data_column(data_column_id.index)
.map(|data_column| data_column.clone_data_column()))
} else {
Ok(None)
Expand Down Expand Up @@ -687,13 +679,17 @@ pub struct OverflowLRUCache<T: BeaconChainTypes> {
maintenance_lock: Mutex<()>,
/// The capacity of the LRU cache
capacity: NonZeroUsize,
/// The number of data columns the node is custodying.
// FIXME(das): Using `Option` as temporary workaround to disable custody requirement checks in
// tests. To be removed once we implement proper fork / epoch transition.
custody_column_count: Option<usize>,
}

impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
_custody_column_count: usize,
custody_column_count: Option<usize>,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let overflow_store = OverflowStore(beacon_store.clone());
Expand All @@ -705,6 +701,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
state_cache: StateLRUCache::new(beacon_store, spec),
maintenance_lock: Mutex::new(()),
capacity,
custody_column_count,
})
}

Expand Down Expand Up @@ -760,16 +757,6 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
block_root: Hash256,
kzg_verified_data_columns: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let mut fixed_data_columns = FixedVector::default();

for data_column in kzg_verified_data_columns {
if let Some(data_column_opt) =
fixed_data_columns.get_mut(data_column.data_column_index() as usize)
{
*data_column_opt = Some(data_column);
}
}

let mut write_lock = self.critical.write();

// Grab existing entry or create a new entry.
Expand All @@ -778,12 +765,22 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
.unwrap_or_else(|| PendingComponents::empty(block_root));

// Merge in the data columns.
pending_components.merge_data_columns(fixed_data_columns);

write_lock.put_pending_components(block_root, pending_components, &self.overflow_store)?;
pending_components.merge_data_columns(kzg_verified_data_columns)?;

// TODO(das): Currently this does not change availability status and nor import yet.
Ok(Availability::MissingComponents(block_root))
if pending_components.is_available(self.custody_column_count) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
write_lock.put_pending_components(
block_root,
pending_components,
&self.overflow_store,
)?;
Ok(Availability::MissingComponents(block_root))
}
}

pub fn put_kzg_verified_blobs<I: IntoIterator<Item = KzgVerifiedBlob<T::EthSpec>>>(
Expand All @@ -809,7 +806,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);

if pending_components.is_available() {
if pending_components.is_available(self.custody_column_count) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -848,7 +845,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pending_components.merge_block(diet_executed_block);

// Check if we have all components and entire set is consistent.
if pending_components.is_available() {
if pending_components.is_available(self.custody_column_count) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -1377,12 +1374,13 @@ mod test {
let test_store = harness.chain.store.clone();
let capacity_non_zero = new_non_zero_usize(capacity);
let cache = Arc::new(
OverflowLRUCache::<T>::new(capacity_non_zero, test_store, spec.clone())
OverflowLRUCache::<T>::new(capacity_non_zero, test_store, None, spec.clone())
.expect("should create cache"),
);
(harness, cache, chain_db_path)
}

// TODO(das): add test for custody columns.
#[tokio::test]
async fn overflow_cache_test_insert_components() {
type E = MinimalEthSpec;
Expand Down Expand Up @@ -1881,6 +1879,7 @@ mod test {
let recovered_cache = OverflowLRUCache::<T>::new(
new_non_zero_usize(capacity),
harness.chain.store.clone(),
None,
harness.chain.spec.clone(),
)
.expect("should recover cache");
Expand Down
3 changes: 3 additions & 0 deletions book/src/help_bn.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ FLAGS:
server on localhost:5052 and import deposit logs from the execution node.
This is equivalent to `--http` on merge-ready networks, or `--http
--eth1` pre-merge
--subscribe-all-data-column-subnets Subscribe to all data column subnets and participate in data custody for
all columns. This will also advertise the beacon node as being long-lived
subscribed to all data column subnets.
--subscribe-all-subnets Subscribe to all subnets regardless of validator count. This will also
advertise the beacon node as being long-lived subscribed to all subnets.
--validator-monitor-auto Enables the automatic detection and monitoring of validators connected to
Expand Down

0 comments on commit 54c4afc

Please sign in to comment.