From a33ef119455c38fd978754edd08ef3b94aeded11 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Wed, 17 Apr 2024 21:21:39 +1000 Subject: [PATCH] Add DA filter for PeerDAS. --- .../src/data_availability_checker.rs | 2 +- .../src/data_availability_checker/error.rs | 2 + .../overflow_lru_cache.rs | 106 +++++++++++++----- 3 files changed, 78 insertions(+), 32 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 743b84e61ea..3b9b060a9e5 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -93,7 +93,7 @@ impl DataAvailabilityChecker { let overflow_cache = OverflowLRUCache::new( OVERFLOW_LRU_CAPACITY, store, - Some(custody_column_count), + custody_column_count, log.new(o!("service" => "availability_cache")), spec.clone(), )?; diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 9e52b34185f..bea73631bee 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,6 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, + UnableToDetermineImportRequirement, Unexpected, SszTypes(ssz_types::Error), MissingBlobs, @@ -40,6 +41,7 @@ impl Error { | Error::Unexpected | Error::ParentStateMissing(_) | Error::BlockReplayError(_) + | Error::UnableToDetermineImportRequirement | Error::RebuildingStateCaches(_) => ErrorCategory::Internal, Error::Kzg(_) | Error::BlobIndexInvalid(_) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 211ec06c151..5852da7628e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -53,6 +53,8 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; /// /// The blobs are all gossip and kzg verified. /// The block has completed all verifications except the availability check. +/// TODO(das): this struct can potentially be reafactored as blobs and data columns are mutually +/// exclusive and this could simplify `is_importable`. #[derive(Encode, Decode, Clone)] pub struct PendingComponents { pub block_root: Hash256, @@ -61,6 +63,11 @@ pub struct PendingComponents { pub executed_block: Option>, } +pub enum BlockImportRequirement { + AllBlobs, + CustodyColumns(usize), +} + impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -228,31 +235,43 @@ impl PendingComponents { /// /// Returns `true` if both the block exists and the number of received blobs / custody columns /// matches the number of expected blobs / custody columns. - pub fn is_available(&self, custody_column_count: Option, log: &Logger) -> bool { - debug!( - log, - "Checking block importability"; - "block_root" => %self.block_root, - "num_expected_data_columns" => ?custody_column_count, - "num_received_data_columns" => self.num_received_data_columns(), - "num_expected_blobs" => ?self.num_expected_blobs(), - "num_received_blobs" => self.num_received_blobs(), - ); - - if let Some(num_expected_blobs) = self.num_expected_blobs() { - // We don't expect any data columns if there's no blobs - let num_expected_data_columns = if num_expected_blobs > 0 { - custody_column_count.unwrap_or(0) - } else { - 0 - }; + pub fn is_available( + &self, + block_import_requirement: BlockImportRequirement, + log: &Logger, + ) -> bool { + match block_import_requirement { + BlockImportRequirement::AllBlobs => { + debug!( + log, + "Checking block and blob importability"; + "block_root" => %self.block_root, + "num_expected_blobs" => ?self.num_expected_blobs(), + "num_received_blobs" => self.num_received_blobs(), + ); + if let Some(num_expected_blobs) = self.num_expected_blobs() { + num_expected_blobs == self.num_received_blobs() + } else { + false + } + } + BlockImportRequirement::CustodyColumns(num_expected_columns) => { + let num_received_data_columns = self.num_received_data_columns(); + + debug!( + log, + "Checking block and data column importability"; + "block_root" => %self.block_root, + "num_received_data_columns" => num_received_data_columns, + ); - // TODO(das): migrate from checking blob count to checking only column count post - // EIP-7594 fork. - num_expected_blobs == self.num_received_blobs() - && num_expected_data_columns == self.num_received_data_columns() - } else { - false + if let Some(num_expected_blobs) = self.num_expected_blobs() { + // No data columns when there are 0 blobs + num_expected_blobs == 0 || num_expected_columns == num_received_data_columns + } else { + false + } + } } } @@ -691,7 +710,7 @@ pub struct OverflowLRUCache { /// The capacity of the LRU cache capacity: NonZeroUsize, /// The number of data columns the node is custodying. - custody_column_count: Option, + custody_column_count: usize, log: Logger, spec: ChainSpec, } @@ -700,7 +719,7 @@ impl OverflowLRUCache { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, - custody_column_count: Option, + custody_column_count: usize, log: Logger, spec: ChainSpec, ) -> Result { @@ -764,6 +783,27 @@ impl OverflowLRUCache { } } + fn block_import_requirement( + &self, + pending_components: &PendingComponents, + ) -> Result { + let epoch = pending_components + .epoch() + .ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?; + + let peer_das_enabled = self + .spec + .peer_das_epoch + .map_or(false, |peer_das_epoch| epoch >= peer_das_epoch); + if peer_das_enabled { + Ok(BlockImportRequirement::CustodyColumns( + self.custody_column_count, + )) + } else { + Ok(BlockImportRequirement::AllBlobs) + } + } + pub fn put_kzg_verified_data_columns< I: IntoIterator>, >( @@ -781,7 +821,8 @@ impl OverflowLRUCache { // Merge in the data columns. pending_components.merge_data_columns(kzg_verified_data_columns)?; - if pending_components.is_available(self.custody_column_count, &self.log) { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -820,7 +861,8 @@ impl OverflowLRUCache { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if pending_components.is_available(self.custody_column_count, &self.log) { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -859,7 +901,8 @@ impl OverflowLRUCache { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - if pending_components.is_available(self.custody_column_count, &self.log) { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(block_import_requirement, &self.log) { // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(|diet_block| { @@ -1168,6 +1211,7 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; + const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 4; fn get_store_with_spec( db_path: &TempDir, @@ -1391,7 +1435,7 @@ mod test { OverflowLRUCache::::new( capacity_non_zero, test_store, - None, + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, harness.logger().clone(), spec.clone(), ) @@ -1899,7 +1943,7 @@ mod test { let recovered_cache = OverflowLRUCache::::new( new_non_zero_usize(capacity), harness.chain.store.clone(), - None, + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, harness.logger().clone(), harness.chain.spec.clone(), )