Skip to content

Commit

Permalink
Import gossip data column into data availability checker (#6197)
Browse files Browse the repository at this point in the history
* Import gossip data column into data availability checker
  • Loading branch information
jimmygchen authored Aug 2, 2024
1 parent 0e96d4f commit acd3151
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 41 deletions.
36 changes: 23 additions & 13 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2959,9 +2959,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| c.block_root())
.map(|c| (c.slot(), c.block_root()))
.unique()
.exactly_one()
else {
Expand All @@ -2981,7 +2981,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let r = self
.check_gossip_data_columns_availability_and_import(data_columns)
.check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
Expand Down Expand Up @@ -3298,6 +3298,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
Expand All @@ -3306,15 +3308,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;

self.process_availability(slot, availability).await
}
Expand Down Expand Up @@ -3629,7 +3627,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs) = signed_block.deconstruct();
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let block = signed_block.message();
ops.extend(
confirmed_state_roots
Expand All @@ -3650,6 +3648,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(_data_columns) = data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// if !data_columns.is_empty() {
// debug!(
// self.log, "Writing data_columns to store";
// "block_root" => %block_root,
// "count" => data_columns.len(),
// );
// ops.push(StoreOp::PutDataColumns(block_root, data_columns));
// }
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let (block_root, block, blobs_opt) = self.deconstruct();
// TODO(das): rpc data columns to be merged from `das` branch
let (block_root, block, blobs_opt, _data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
let inner = match blobs_opt {
Expand Down
33 changes: 27 additions & 6 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};

mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::GossipVerifiedDataColumn;
use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

Expand Down Expand Up @@ -191,10 +194,18 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

pub fn put_gossip_data_columns(
&self,
_gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
slot: Slot,
block_root: Hash256,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das) to be implemented
Err(AvailabilityCheckError::Unexpected)
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(block_root, epoch, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down Expand Up @@ -231,6 +242,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: None,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand All @@ -251,6 +263,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: verified_blobs,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand Down Expand Up @@ -297,6 +310,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: None,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand All @@ -312,6 +326,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: verified_blobs,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand Down Expand Up @@ -477,6 +492,7 @@ pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
blobs_available_timestamp: Option<Duration>,
}
Expand All @@ -486,11 +502,13 @@ impl<E: EthSpec> AvailableBlock<E> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
) -> Self {
Self {
block_root,
block,
blobs,
data_columns,
blobs_available_timestamp: None,
}
}
Expand All @@ -510,20 +528,23 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs_available_timestamp
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
let AvailableBlock {
block_root,
block,
blobs,
data_columns,
blobs_available_timestamp: _,
} = self;
(block_root, block, blobs)
(block_root, block, blobs, data_columns)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ impl<E: EthSpec> PendingComponents<E> {
///
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(self, recover: R) -> Result<Availability<E>, AvailabilityCheckError>
pub fn make_available<R>(
self,
block_import_requirement: BlockImportRequirement,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
Expand All @@ -226,7 +230,7 @@ impl<E: EthSpec> PendingComponents<E> {
let Self {
block_root,
verified_blobs,
verified_data_columns: _,
verified_data_columns,
executed_block,
} = self;

Expand All @@ -239,17 +243,29 @@ impl<E: EthSpec> PendingComponents<E> {
let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);

let (blobs, data_columns) = match block_import_requirement {
BlockImportRequirement::AllBlobs => {
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
(Some(VariableList::new(verified_blobs)?), None)
}
BlockImportRequirement::CustodyColumns(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect();
(None, Some(verified_data_columns))
}
};
let verified_blobs = VariableList::new(verified_blobs)?;

let executed_block = recover(diet_executed_block)?;

Expand All @@ -262,7 +278,8 @@ impl<E: EthSpec> PendingComponents<E> {
let available_block = AvailableBlock {
block_root,
block,
blobs: Some(verified_blobs),
blobs,
data_columns,
blobs_available_timestamp,
};
Ok(Availability::Available(Box::new(
Expand Down Expand Up @@ -404,7 +421,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand All @@ -413,7 +430,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

// TODO(das): gossip and rpc code paths to be implemented.
// TODO(das): rpc code paths to be implemented.
#[allow(dead_code)]
pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
Expand All @@ -439,7 +456,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down Expand Up @@ -478,7 +495,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down
19 changes: 19 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl<T: BeaconChainTypes> GossipVerifiedDataColumn<T> {
pub fn signed_block_header(&self) -> SignedBeaconBlockHeader {
self.data_column.data.signed_block_header.clone()
}

pub fn into_inner(self) -> KzgVerifiedDataColumn<T::EthSpec> {
self.data_column
}
}

/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification.
Expand All @@ -204,6 +208,9 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
verify_kzg_for_data_column(data_column, kzg)
}
pub fn to_data_column(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data
}
Expand All @@ -226,9 +233,21 @@ pub struct KzgVerifiedCustodyDataColumn<E: EthSpec> {
}

impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
/// Mark a column as custody column. Caller must ensure that our current custody requirements
/// include this column
pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn<E>) -> Self {
Self {
data: kzg_verified.to_data_column(),
}
}

pub fn index(&self) -> ColumnIndex {
self.data.index
}

pub fn into_inner(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
}

/// Complete kzg verification for a `DataColumnSidecar`.
Expand Down
Loading

0 comments on commit acd3151

Please sign in to comment.