Skip to content

Commit

Permalink
Add DA filter for PeerDAS.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Apr 17, 2024
1 parent 467da4a commit a33ef11
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 32 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let overflow_cache = OverflowLRUCache::new(
OVERFLOW_LRU_CAPACITY,
store,
Some(custody_column_count),
custody_column_count,
log.new(o!("service" => "availability_cache")),
spec.clone(),
)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum Error {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
},
UnableToDetermineImportRequirement,
Unexpected,
SszTypes(ssz_types::Error),
MissingBlobs,
Expand Down Expand Up @@ -40,6 +41,7 @@ impl Error {
| Error::Unexpected
| Error::ParentStateMissing(_)
| Error::BlockReplayError(_)
| Error::UnableToDetermineImportRequirement
| Error::RebuildingStateCaches(_) => ErrorCategory::Internal,
Error::Kzg(_)
| Error::BlobIndexInvalid(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256};
///
/// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check.
/// TODO(das): this struct can potentially be reafactored as blobs and data columns are mutually
/// exclusive and this could simplify `is_importable`.
#[derive(Encode, Decode, Clone)]
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
Expand All @@ -61,6 +63,11 @@ pub struct PendingComponents<E: EthSpec> {
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
}

pub enum BlockImportRequirement {
AllBlobs,
CustodyColumns(usize),
}

impl<E: EthSpec> PendingComponents<E> {
/// Returns an immutable reference to the cached block.
pub fn get_cached_block(&self) -> &Option<DietAvailabilityPendingExecutedBlock<E>> {
Expand Down Expand Up @@ -228,31 +235,43 @@ impl<E: EthSpec> PendingComponents<E> {
///
/// Returns `true` if both the block exists and the number of received blobs / custody columns
/// matches the number of expected blobs / custody columns.
pub fn is_available(&self, custody_column_count: Option<usize>, log: &Logger) -> bool {
debug!(
log,
"Checking block importability";
"block_root" => %self.block_root,
"num_expected_data_columns" => ?custody_column_count,
"num_received_data_columns" => self.num_received_data_columns(),
"num_expected_blobs" => ?self.num_expected_blobs(),
"num_received_blobs" => self.num_received_blobs(),
);

if let Some(num_expected_blobs) = self.num_expected_blobs() {
// We don't expect any data columns if there's no blobs
let num_expected_data_columns = if num_expected_blobs > 0 {
custody_column_count.unwrap_or(0)
} else {
0
};
pub fn is_available(
&self,
block_import_requirement: BlockImportRequirement,
log: &Logger,
) -> bool {
match block_import_requirement {
BlockImportRequirement::AllBlobs => {
debug!(
log,
"Checking block and blob importability";
"block_root" => %self.block_root,
"num_expected_blobs" => ?self.num_expected_blobs(),
"num_received_blobs" => self.num_received_blobs(),
);
if let Some(num_expected_blobs) = self.num_expected_blobs() {
num_expected_blobs == self.num_received_blobs()
} else {
false
}
}
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();

debug!(
log,
"Checking block and data column importability";
"block_root" => %self.block_root,
"num_received_data_columns" => num_received_data_columns,
);

// TODO(das): migrate from checking blob count to checking only column count post
// EIP-7594 fork.
num_expected_blobs == self.num_received_blobs()
&& num_expected_data_columns == self.num_received_data_columns()
} else {
false
if let Some(num_expected_blobs) = self.num_expected_blobs() {
// No data columns when there are 0 blobs
num_expected_blobs == 0 || num_expected_columns == num_received_data_columns
} else {
false
}
}
}
}

Expand Down Expand Up @@ -691,7 +710,7 @@ pub struct OverflowLRUCache<T: BeaconChainTypes> {
/// The capacity of the LRU cache
capacity: NonZeroUsize,
/// The number of data columns the node is custodying.
custody_column_count: Option<usize>,
custody_column_count: usize,
log: Logger,
spec: ChainSpec,
}
Expand All @@ -700,7 +719,7 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_column_count: Option<usize>,
custody_column_count: usize,
log: Logger,
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
Expand Down Expand Up @@ -764,6 +783,27 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
}
}

fn block_import_requirement(
&self,
pending_components: &PendingComponents<T::EthSpec>,
) -> Result<BlockImportRequirement, AvailabilityCheckError> {
let epoch = pending_components
.epoch()
.ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?;

let peer_das_enabled = self
.spec
.peer_das_epoch
.map_or(false, |peer_das_epoch| epoch >= peer_das_epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
))
} else {
Ok(BlockImportRequirement::AllBlobs)
}
}

pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedDataColumn<T::EthSpec>>,
>(
Expand All @@ -781,7 +821,8 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
// Merge in the data columns.
pending_components.merge_data_columns(kzg_verified_data_columns)?;

if pending_components.is_available(self.custody_column_count, &self.log) {
let block_import_requirement = self.block_import_requirement(&pending_components)?;
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -820,7 +861,8 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
// Merge in the blobs.
pending_components.merge_blobs(fixed_blobs);

if pending_components.is_available(self.custody_column_count, &self.log) {
let block_import_requirement = self.block_import_requirement(&pending_components)?;
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -859,7 +901,8 @@ impl<T: BeaconChainTypes> OverflowLRUCache<T> {
pending_components.merge_block(diet_executed_block);

// Check if we have all components and entire set is consistent.
if pending_components.is_available(self.custody_column_count, &self.log) {
let block_import_requirement = self.block_import_requirement(&pending_components)?;
if pending_components.is_available(block_import_requirement, &self.log) {
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
Expand Down Expand Up @@ -1168,6 +1211,7 @@ mod test {
use types::{ExecPayload, MinimalEthSpec};

const LOW_VALIDATOR_COUNT: usize = 32;
const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 4;

fn get_store_with_spec<E: EthSpec>(
db_path: &TempDir,
Expand Down Expand Up @@ -1391,7 +1435,7 @@ mod test {
OverflowLRUCache::<T>::new(
capacity_non_zero,
test_store,
None,
DEFAULT_TEST_CUSTODY_COLUMN_COUNT,
harness.logger().clone(),
spec.clone(),
)
Expand Down Expand Up @@ -1899,7 +1943,7 @@ mod test {
let recovered_cache = OverflowLRUCache::<T>::new(
new_non_zero_usize(capacity),
harness.chain.store.clone(),
None,
DEFAULT_TEST_CUSTODY_COLUMN_COUNT,
harness.logger().clone(),
harness.chain.spec.clone(),
)
Expand Down

0 comments on commit a33ef11

Please sign in to comment.