Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP custody lookup sync #28

Draft
wants to merge 5 commits into
base: das
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::data_column_verification::{
CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn,
};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -3074,6 +3076,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_custody_columns(
self: &Arc<Self>,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

// TODO(das): custody column SSE event
// TODO(das): Why is the slot necessary here?
let slot = Slot::new(0);

let r = self
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand Down Expand Up @@ -3374,6 +3403,44 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided columns can make any cached blocks available, and imports immediately
/// if so, otherwise caches the columns in the data availability checker.
async fn check_rpc_custody_columns_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
custody_columns: Vec<CustodyDataColumn<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
for header in custody_columns
.iter()
.map(|c| c.as_data_column().signed_block_header.clone())
.unique()
{
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
}
let availability = self
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down
73 changes: 60 additions & 13 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::blob_verification::{GossipBlobError, GossipVerifiedBlobList};
use crate::block_verification::BlockError;
use crate::data_availability_checker::AvailabilityCheckError;
pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumnList};
use crate::data_column_verification::{
CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList,
};
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome};
use derivative::Derivative;
Expand All @@ -11,7 +13,7 @@ use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList};
use types::data_column_sidecar::{self, DataColumnSidecarList};
use types::data_column_sidecar::{self};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -52,23 +54,23 @@ impl<E: EthSpec> RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
RpcBlockInner::BlockAndCustodyColumns(block, _) => block,
}
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(),
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs),
RpcBlockInner::BlockAndDataColumns(_, _) => None,
RpcBlockInner::BlockAndCustodyColumns(_, _) => None,
}
}
}
Expand All @@ -86,7 +88,10 @@ enum RpcBlockInner<E: EthSpec> {
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all
/// requested data columns, all block roots matching for this block.
BlockAndDataColumns(Arc<SignedBeaconBlock<E>>, DataColumnSidecarList<E>),
BlockAndCustodyColumns(
Arc<SignedBeaconBlock<E>>,
VariableList<CustodyDataColumn<E>, <E as EthSpec>::DataColumnCount>,
),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -144,6 +149,35 @@ impl<E: EthSpec> RpcBlock<E> {
})
}

pub fn new_with_custody_columns(
block_root: Option<Hash256>,
block: Arc<SignedBeaconBlock<E>>,
custody_columns: Vec<CustodyDataColumn<E>>,
) -> Result<Self, AvailabilityCheckError> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

if let Ok(block_commitments) = block.message().body().blob_kzg_commitments() {
// The number of required custody columns is out of scope here.
if !block_commitments.is_empty() && custody_columns.is_empty() {
return Err(AvailabilityCheckError::MissingCustodyColumns);
}
}
// Treat empty blob lists as if they are missing.
let inner = if custody_columns.is_empty() {
RpcBlockInner::BlockAndCustodyColumns(
block,
VariableList::new(custody_columns)
.expect("TODO(das): custody vec should never exceed len"),
)
} else {
RpcBlockInner::Block(block)
};
Ok(Self {
block_root,
block: inner,
})
}

pub fn new_from_fixed(
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
Expand All @@ -168,27 +202,27 @@ impl<E: EthSpec> RpcBlock<E> {
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
Option<CustodyDataColumnList<E>>,
) {
let block_root = self.block_root();
match self.block {
RpcBlockInner::Block(block) => (block_root, block, None, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None),
RpcBlockInner::BlockAndDataColumns(block, data_columns) => {
RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => {
(block_root, block, None, Some(data_columns))
}
}
}
pub fn n_blobs(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0,
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0,
RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(),
}
}
pub fn n_data_columns(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0,
RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(),
RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(),
}
}
}
Expand Down Expand Up @@ -545,7 +579,20 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
let inner = match (blobs_opt, data_columns_opt) {
(None, None) => RpcBlockInner::Block(block),
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns),
(_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns(
block,
VariableList::new(
data_columns
.into_iter()
// TODO(das): This is an ugly hack that should be removed. After updating
// store types to handle custody data columns this should not be required.
// It's okay-ish because available blocks must have all the required custody
// columns.
.map(|d| CustodyDataColumn::from_asserted_custody(d))
.collect(),
)
.expect("data column list is within bounds"),
),
};
RpcBlock {
block_root,
Expand Down Expand Up @@ -577,14 +624,14 @@ impl<E: EthSpec> AsBlock<E> for RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
RpcBlockInner::BlockAndCustodyColumns(block, _) => block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(),
}
}
fn canonical_root(&self) -> Hash256 {
Expand Down
Loading
Loading