diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index d0f9bad31b1..fa2c6c9d645 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -93,7 +93,7 @@ impl DataAvailabilityChecker { let overflow_cache = OverflowLRUCache::new( OVERFLOW_LRU_CAPACITY, store, - custody_column_count, + Some(custody_column_count), spec.clone(), )?; diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 4c56d2eee16..d9f3f35ba72 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -56,7 +56,7 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, - pub verified_data_columns: FixedVector>, E::DataColumnCount>, + pub verified_data_columns: VariableList, E::DataColumnCount>, pub executed_block: Option>, } @@ -73,10 +73,20 @@ impl PendingComponents { &self.verified_blobs } - /// Returns an immutable reference to the fixed vector of cached data columns. + /// Returns a mutable reference to the cached data column. + pub fn get_cached_data_column( + &self, + data_column_index: u64, + ) -> Option<&KzgVerifiedDataColumn> { + self.verified_data_columns + .iter() + .find(|d| d.data_column_index() == data_column_index) + } + + /// Returns an immutable reference to the list of cached data columns. pub fn get_cached_data_columns( &self, - ) -> &FixedVector>, E::DataColumnCount> { + ) -> &VariableList, E::DataColumnCount> { &self.verified_data_columns } @@ -95,7 +105,7 @@ impl PendingComponents { /// Returns a mutable reference to the fixed vector of cached data columns. pub fn get_cached_data_columns_mut( &mut self, - ) -> &mut FixedVector>, E::DataColumnCount> { + ) -> &mut VariableList, E::DataColumnCount> { &mut self.verified_data_columns } @@ -111,16 +121,15 @@ impl PendingComponents { .unwrap_or(false) } - /// Checks if a data column exists at the given index in the cache. + /// Checks if a data column of a given index exists in the cache. /// /// Returns: - /// - `true` if a data column exists at the given index. + /// - `true` if a data column for the given index exists. /// - `false` otherwise. - fn data_column_exists(&self, data_colum_index: usize) -> bool { + fn data_column_exists(&self, data_colum_index: u64) -> bool { self.get_cached_data_columns() - .get(data_colum_index) - .map(|d| d.is_some()) - .unwrap_or(false) + .iter() + .any(|d| d.data_column_index() == data_colum_index) } /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a @@ -138,6 +147,11 @@ impl PendingComponents { self.get_cached_blobs().iter().flatten().count() } + /// Returns the number of data columns that have been received and are stored in the cache. + pub fn num_received_data_columns(&self) -> usize { + self.get_cached_data_columns().iter().count() + } + /// Inserts a block into the cache. pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { *self.get_cached_block_mut() = Some(block) @@ -185,40 +199,18 @@ impl PendingComponents { } } - /// Inserts a data column at a specific index in the cache. - /// - /// Existing data column at the index will be replaced. - fn insert_data_column_at_index( - &mut self, - data_column_index: usize, - data_column: KzgVerifiedDataColumn, - ) { - if let Some(b) = self - .get_cached_data_columns_mut() - .get_mut(data_column_index) - { - *b = Some(data_column); - } - } - /// Merges a given set of data columns into the cache. - /// - /// Data columns are only inserted if: - /// 1. The data column entry at the index is empty and no block exists. - /// 2. The block exists and its commitments matches the data column's commitments. - fn merge_data_columns( + fn merge_data_columns>>( &mut self, - data_columns: FixedVector>, E::DataColumnCount>, - ) { - for (index, data_column) in data_columns.iter().cloned().enumerate() { - let Some(data_column) = data_column else { - continue; - }; + kzg_verified_data_columns: I, + ) -> Result<(), AvailabilityCheckError> { + for data_column in kzg_verified_data_columns { // TODO(das): Add equivalent checks for data columns if necessary - if !self.data_column_exists(index) { - self.insert_data_column_at_index(index, data_column) + if !self.data_column_exists(data_column.data_column_index()) { + self.verified_data_columns.push(data_column)?; } } + Ok(()) } /// Inserts a new block and revalidates the existing blobs against it. @@ -230,13 +222,24 @@ impl PendingComponents { self.merge_blobs(reinsert); } - /// Checks if the block and all of its expected blobs are available in the cache. + /// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are + /// available in the cache. /// - /// Returns `true` if both the block exists and the number of received blobs matches the number - /// of expected blobs. - pub fn is_available(&self) -> bool { + /// Returns `true` if both the block exists and the number of received blobs / custody columns + /// matches the number of expected blobs / custody columns. + pub fn is_available(&self, custody_column_count: Option) -> bool { if let Some(num_expected_blobs) = self.num_expected_blobs() { + // We don't expect any data columns if there's no blobs + let num_expected_data_columns = if num_expected_blobs > 0 { + custody_column_count.unwrap_or(0) + } else { + 0 + }; + + // TODO(das): migrate from checking blob count to checking only column count post + // EIP-7594 fork. num_expected_blobs == self.num_received_blobs() + && num_expected_data_columns == self.num_received_data_columns() } else { false } @@ -247,7 +250,7 @@ impl PendingComponents { Self { block_root, verified_blobs: FixedVector::default(), - verified_data_columns: FixedVector::default(), + verified_data_columns: VariableList::default(), executed_block: None, } } @@ -289,8 +292,7 @@ impl PendingComponents { // TODO(das) Do we need a check here for number of expected custody columns? let verified_data_columns = verified_data_columns .into_iter() - .cloned() - .filter_map(|d| d.map(|d| d.to_data_column())) + .map(|d| d.to_data_column()) .collect::>() .into(); @@ -329,16 +331,15 @@ impl PendingComponents { }); } } - for maybe_data_column in self.verified_data_columns.iter() { - if maybe_data_column.is_some() { - return maybe_data_column.as_ref().map(|kzg_verified_data_column| { - kzg_verified_data_column - .as_data_column() - .slot() - .epoch(E::slots_per_epoch()) - }); - } + + if let Some(kzg_verified_data_column) = self.verified_data_columns.first() { + let epoch = kzg_verified_data_column + .as_data_column() + .slot() + .epoch(E::slots_per_epoch()); + return Some(epoch); } + None }) } @@ -427,10 +428,7 @@ impl OverflowStore { .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? } - for data_column in Vec::from(pending_components.verified_data_columns) - .into_iter() - .flatten() - { + for data_column in pending_components.verified_data_columns.into_iter() { let key = OverflowKey::from_data_column_id::(DataColumnIdentifier { block_root, index: data_column.data_column_index(), @@ -476,15 +474,14 @@ impl OverflowStore { .ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? = Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); } - OverflowKey::DataColumn(_, index) => { - *maybe_pending_components + // TODO(das): Remove unused index + OverflowKey::DataColumn(_, _index) => { + let data_column = + KzgVerifiedDataColumn::from_ssz_bytes(value_bytes.as_slice())?; + maybe_pending_components .get_or_insert_with(|| PendingComponents::empty(block_root)) - .verified_data_columns - .get_mut(index as usize) - .ok_or(AvailabilityCheckError::DataColumnIndexInvalid(index as u64))? = - Some(KzgVerifiedDataColumn::from_ssz_bytes( - value_bytes.as_slice(), - )?); + .get_cached_data_columns_mut() + .push(data_column)?; } } } @@ -601,12 +598,7 @@ impl Critical { ) -> Result>>, AvailabilityCheckError> { if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { Ok(pending_components - .verified_data_columns - .get(data_column_id.index as usize) - .ok_or(AvailabilityCheckError::DataColumnIndexInvalid( - data_column_id.index, - ))? - .as_ref() + .get_cached_data_column(data_column_id.index) .map(|data_column| data_column.clone_data_column())) } else { Ok(None) @@ -687,13 +679,17 @@ pub struct OverflowLRUCache { maintenance_lock: Mutex<()>, /// The capacity of the LRU cache capacity: NonZeroUsize, + /// The number of data columns the node is custodying. + // FIXME(das): Using `Option` as temporary workaround to disable custody requirement checks in + // tests. To be removed once we implement proper fork / epoch transition. + custody_column_count: Option, } impl OverflowLRUCache { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, - _custody_column_count: usize, + custody_column_count: Option, spec: ChainSpec, ) -> Result { let overflow_store = OverflowStore(beacon_store.clone()); @@ -705,6 +701,7 @@ impl OverflowLRUCache { state_cache: StateLRUCache::new(beacon_store, spec), maintenance_lock: Mutex::new(()), capacity, + custody_column_count, }) } @@ -760,16 +757,6 @@ impl OverflowLRUCache { block_root: Hash256, kzg_verified_data_columns: I, ) -> Result, AvailabilityCheckError> { - let mut fixed_data_columns = FixedVector::default(); - - for data_column in kzg_verified_data_columns { - if let Some(data_column_opt) = - fixed_data_columns.get_mut(data_column.data_column_index() as usize) - { - *data_column_opt = Some(data_column); - } - } - let mut write_lock = self.critical.write(); // Grab existing entry or create a new entry. @@ -778,12 +765,22 @@ impl OverflowLRUCache { .unwrap_or_else(|| PendingComponents::empty(block_root)); // Merge in the data columns. - pending_components.merge_data_columns(fixed_data_columns); - - write_lock.put_pending_components(block_root, pending_components, &self.overflow_store)?; + pending_components.merge_data_columns(kzg_verified_data_columns)?; - // TODO(das): Currently this does not change availability status and nor import yet. - Ok(Availability::MissingComponents(block_root)) + if pending_components.is_available(self.custody_column_count) { + // No need to hold the write lock anymore + drop(write_lock); + pending_components.make_available(|diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) + } else { + write_lock.put_pending_components( + block_root, + pending_components, + &self.overflow_store, + )?; + Ok(Availability::MissingComponents(block_root)) + } } pub fn put_kzg_verified_blobs>>( @@ -809,7 +806,7 @@ impl OverflowLRUCache { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if pending_components.is_available() { + if pending_components.is_available(self.custody_column_count) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -848,7 +845,7 @@ impl OverflowLRUCache { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - if pending_components.is_available() { + if pending_components.is_available(self.custody_column_count) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -1377,12 +1374,13 @@ mod test { let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let cache = Arc::new( - OverflowLRUCache::::new(capacity_non_zero, test_store, spec.clone()) + OverflowLRUCache::::new(capacity_non_zero, test_store, None, spec.clone()) .expect("should create cache"), ); (harness, cache, chain_db_path) } + // TODO(das): add test for custody columns. #[tokio::test] async fn overflow_cache_test_insert_components() { type E = MinimalEthSpec; @@ -1881,6 +1879,7 @@ mod test { let recovered_cache = OverflowLRUCache::::new( new_non_zero_usize(capacity), harness.chain.store.clone(), + None, harness.chain.spec.clone(), ) .expect("should recover cache"); diff --git a/book/src/help_bn.md b/book/src/help_bn.md index e437925a0e8..b628cae145a 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -108,6 +108,9 @@ FLAGS: server on localhost:5052 and import deposit logs from the execution node. This is equivalent to `--http` on merge-ready networks, or `--http --eth1` pre-merge + --subscribe-all-data-column-subnets Subscribe to all data column subnets and participate in data custody for + all columns. This will also advertise the beacon node as being long-lived + subscribed to all data column subnets. --subscribe-all-subnets Subscribe to all subnets regardless of validator count. This will also advertise the beacon node as being long-lived subscribed to all subnets. --validator-monitor-auto Enables the automatic detection and monitoring of validators connected to