Skip to content

Commit

Permalink
Implement custody sync
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 8, 2024
1 parent bc190e7 commit 0a212f0
Show file tree
Hide file tree
Showing 22 changed files with 1,418 additions and 344 deletions.
69 changes: 68 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::data_column_verification::{
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn,
};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -3074,6 +3076,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

// TODO(das): custody column SSE event
// TODO(das): Why is the slot necessary here?
let slot = Slot::new(0);

let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand Down Expand Up @@ -3374,6 +3403,44 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided columns can make any cached blocks available, and imports immediately
/// if so, otherwise caches the columns in the data availability checker.
async fn check_rpc_custody_columns_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
for header in custody_columns
.iter()
.map(|c| c.as_data_column().signed_block_header.clone())
.unique()
{
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
}
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down
73 changes: 60 additions & 13 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::blob_verification::{GossipBlobError, GossipVerifiedBlobList};
use crate::block_verification::BlockError;
use crate::data_availability_checker::AvailabilityCheckError;
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumnList};
use crate::data_column_verification::{
CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList,
};
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
use derivative::Derivative;
Expand All @@ -11,7 +13,7 @@ use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList};
use types::data_column_sidecar::{self, DataColumnSidecarList};
use types::data_column_sidecar::{self};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -52,23 +54,23 @@ impl<E: EthSpec> RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
RpcBlockInner::BlockAndCustodyColumns(block, _) => block,
}
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(),
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs),
RpcBlockInner::BlockAndDataColumns(_, _) => None,
RpcBlockInner::BlockAndCustodyColumns(_, _) => None,
}
}
}
Expand All @@ -86,7 +88,10 @@ enum RpcBlockInner<E: EthSpec> {
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all
/// requested data columns, all block roots matching for this block.
BlockAndDataColumns(Arc<SignedBeaconBlock<E>>, DataColumnSidecarList<E>),
BlockAndCustodyColumns(
Arc<SignedBeaconBlock<E>>,
VariableList<CustodyDataColumn<E>, <E as EthSpec>::DataColumnCount>,
),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -144,6 +149,35 @@ impl<E: EthSpec> RpcBlock<E> {
})
}

pub fn new_with_custody_columns(
block_root: Option<Hash256>,
block: Arc<SignedBeaconBlock<E>>,
custody_columns: Vec<CustodyDataColumn<E>>,
) -> Result<Self, AvailabilityCheckError> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

if let Ok(block_commitments) = block.message().body().blob_kzg_commitments() {
// The number of required custody columns is out of scope here.
if block_commitments.len() > 0 && custody_columns.len() == 0 {
return Err(AvailabilityCheckError::MissingCustodyColumns);
}
}
// Treat empty blob lists as if they are missing.
let inner = if custody_columns.is_empty() {
RpcBlockInner::BlockAndCustodyColumns(
block,
VariableList::new(custody_columns)
.expect("TODO(das): custody vec should never exceed len"),
)
} else {
RpcBlockInner::Block(block)
};
Ok(Self {
block_root,
block: inner,
})
}

pub fn new_from_fixed(
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
Expand All @@ -168,27 +202,27 @@ impl<E: EthSpec> RpcBlock<E> {
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
Option<CustodyDataColumnList<E>>,
) {
let block_root = self.block_root();
match self.block {
RpcBlockInner::Block(block) => (block_root, block, None, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None),
RpcBlockInner::BlockAndDataColumns(block, data_columns) => {
RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => {
(block_root, block, None, Some(data_columns))
}
}
}
pub fn n_blobs(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0,
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0,
RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(),
}
}
pub fn n_data_columns(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0,
RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(),
RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(),
}
}
}
Expand Down Expand Up @@ -545,7 +579,20 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
let inner = match (blobs_opt, data_columns_opt) {
(None, None) => RpcBlockInner::Block(block),
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns),
(_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns(
block,
VariableList::new(
data_columns
.into_iter()
// TODO(das): This is an ugly hack that should be removed. After updating
// store types to handle custody data columns this should not be required.
// It's okay-ish because available blocks must have all the required custody
// columns.
.map(|d| CustodyDataColumn::from_asserted_custody(d))
.collect(),
)
.expect("data column list is within bounds"),
),
};
RpcBlock {
block_root,
Expand Down Expand Up @@ -577,14 +624,14 @@ impl<E: EthSpec> AsBlock<E> for RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
RpcBlockInner::BlockAndCustodyColumns(block, _) => block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(),
}
}
fn canonical_root(&self) -> Hash256 {
Expand Down
Loading

0 comments on commit 0a212f0

Please sign in to comment.