Skip to content

Commit

Permalink
Add DataColumnSidecarsByRoot req/resp protocol (#5196)
Browse files Browse the repository at this point in the history
* Add stub for `DataColumnsByRoot`

* Add basic implementation of serving RPC data column from DA checker.

* Store data columns in early attester cache and blobs db.

* Apply suggestions from code review

Co-authored-by: Eitan Seri-Levi <eserilev@gmail.com>
Co-authored-by: Jacob Kaufmann <jacobkaufmann18@gmail.com>

* Fix build.

* Store `DataColumnInfo` in database and various cleanups.

* Update `DataColumnSidecar` ssz max size and remove panic code.

---------

Co-authored-by: Eitan Seri-Levi <eserilev@gmail.com>
Co-authored-by: Jacob Kaufmann <jacobkaufmann18@gmail.com>
  • Loading branch information
3 people authored Mar 4, 2024
1 parent 28756da commit ae470d7
Show file tree
Hide file tree
Showing 42 changed files with 1,465 additions and 131 deletions.
95 changes: 78 additions & 17 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{
GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar,
};
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
Expand All @@ -25,6 +23,7 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -124,6 +123,7 @@ use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -1176,6 +1176,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_columns_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
self.early_attester_cache
.get_data_columns(*block_root)
.map_or_else(|| self.get_data_columns(block_root), Ok)
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -1251,6 +1260,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Returns the data columns at the given root, if any.
///
/// ## Errors
/// May return a database error.
pub fn get_data_columns(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
match self.store.get_data_columns(block_root)? {
Some(data_columns) => Ok(data_columns),
None => Ok(DataColumnSidecarList::default()),
}
}

pub fn get_blinded_block(
&self,
block_root: &Hash256,
Expand Down Expand Up @@ -2088,10 +2111,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError<T::EthSpec>> {
metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| {
GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES);
v
})
Expand Down Expand Up @@ -2912,18 +2935,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

pub fn process_gossip_data_column(
/// Cache the data column in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_column(
self: &Arc<Self>,
gossip_verified_data_column: GossipVerifiedDataColumnSidecar<T>,
) {
let data_column = gossip_verified_data_column.as_data_column();
// TODO(das) send to DA checker
info!(
self.log,
"Processed gossip data column";
"index" => data_column.index,
"slot" => data_column.slot().as_u64()
);
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let block_root = data_column.block_root();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

let r = self
.check_gossip_data_column_availability_and_import(data_column)
.await;
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3198,6 +3231,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_column_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let slot = data_column.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
}
let availability = self
.data_availability_checker
.put_gossip_data_column(data_column)?;

self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
Expand Down Expand Up @@ -3475,7 +3525,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs) = signed_block.deconstruct();
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let block = signed_block.message();
ops.extend(
confirmed_state_roots
Expand All @@ -3496,6 +3546,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(data_columns) = data_columns {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down
41 changes: 1 addition & 40 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use ssz_types::VariableList;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256,
SignedBeaconBlockHeader, Slot,
BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
};

/// An error occurred while validating a gossip blob.
Expand Down Expand Up @@ -185,33 +184,6 @@ pub type GossipVerifiedBlobList<T> = VariableList<
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
>;

#[derive(Debug)]
pub struct GossipVerifiedDataColumnSidecar<T: BeaconChainTypes> {
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
}

impl<T: BeaconChainTypes> GossipVerifiedDataColumnSidecar<T> {
pub fn new(
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
chain: &BeaconChain<T>,
) -> Result<Self, GossipBlobError<T::EthSpec>> {
let header = column_sidecar.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the blob any further in that case.
validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| {
process_block_slash_info::<_, GossipBlobError<T::EthSpec>>(
chain,
BlockSlashInfo::from_early_error_blob(header, e),
)
})
}

pub fn as_data_column(&self) -> &Arc<DataColumnSidecar<T::EthSpec>> {
&self.data_column_sidecar
}
}

/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
Expand Down Expand Up @@ -675,17 +647,6 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
})
}

pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes>(
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
_subnet: u64,
_chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
// TODO(das): validate kzg commitments, cell proofs etc
Ok(GossipVerifiedDataColumnSidecar {
data_column_sidecar: data_column_sidecar.clone(),
})
}

/// Returns the canonical root of the given `blob`.
///
/// Use this function to ensure that we report the blob hashing time Prometheus metric.
Expand Down
32 changes: 32 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::block_verification_types::{
AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock,
};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_column_verification::GossipDataColumnError;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
Expand Down Expand Up @@ -528,6 +529,20 @@ impl<E: EthSpec> BlockSlashInfo<GossipBlobError<E>> {
}
}

impl<E: EthSpec> BlockSlashInfo<GossipDataColumnError<E>> {
pub fn from_early_error_data_column(
header: SignedBeaconBlockHeader,
e: GossipDataColumnError<E>,
) -> Self {
match e {
GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e),
// `InvalidSignature` could indicate any signature in the block, so we want
// to recheck the proposer signature alone.
_ => BlockSlashInfo::SignatureNotChecked(header, e),
}
}
}

/// Process invalid blocks to see if they are suitable for the slasher.
///
/// If no slasher is configured, this is a no-op.
Expand Down Expand Up @@ -2002,6 +2017,23 @@ impl<E: EthSpec> BlockBlobError for GossipBlobError<E> {
}
}

impl<E: EthSpec> BlockBlobError for GossipDataColumnError<E> {
fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self {
GossipDataColumnError::DataColumnIsNotLaterThanParent {
data_column_slot,
parent_slot,
}
}

fn unknown_validator_error(validator_index: u64) -> Self {
GossipDataColumnError::UnknownValidator(validator_index)
}

fn proposer_signature_invalid() -> Self {
GossipDataColumnError::ProposalSignatureInvalid
}
}

/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for
/// `slot` can be obtained from `state`.
///
Expand Down
35 changes: 28 additions & 7 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::sync::Arc;
use types::blob_sidecar::{BlobIdentifier, BlobSidecarError, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -43,20 +44,23 @@ impl<E: EthSpec> RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
}
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs),
RpcBlockInner::BlockAndDataColumns(_, _) => None,
}
}
}
Expand All @@ -72,6 +76,9 @@ enum RpcBlockInner<E: EthSpec> {
/// This variant is used with parent lookups and by-range responses. It should have all blobs
/// ordered, all block roots matching, and the correct number of blobs for this block.
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all data columns
/// ordered, all block roots matching, and the correct number of data columns for this block.
BlockAndDataColumns(Arc<SignedBeaconBlock<E>>, DataColumnSidecarList<E>),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -141,25 +148,36 @@ impl<E: EthSpec> RpcBlock<E> {
Self::new(Some(block_root), block, blobs)
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
let block_root = self.block_root();
match self.block {
RpcBlockInner::Block(block) => (block_root, block, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs)),
RpcBlockInner::Block(block) => (block_root, block, None, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None),
RpcBlockInner::BlockAndDataColumns(block, data_columns) => {
(block_root, block, None, Some(data_columns))
}
}
}
pub fn n_blobs(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) => 0,
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0,
RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(),
}
}
pub fn n_data_columns(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0,
RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(),
}
}
}

/// A block that has gone through all pre-deneb block processing checks including block processing
Expand Down Expand Up @@ -485,12 +503,13 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let (block_root, block, blobs_opt) = self.deconstruct();
let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
let inner = match blobs_opt {
None => RpcBlockInner::Block(block),
Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs),
let inner = match (blobs_opt, data_columns_opt) {
(None, None) => RpcBlockInner::Block(block),
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns),
};
RpcBlock {
block_root,
Expand Down Expand Up @@ -522,12 +541,14 @@ impl<E: EthSpec> AsBlock<E> for RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
}
}
fn canonical_root(&self) -> Hash256 {
Expand Down
Loading

0 comments on commit ae470d7

Please sign in to comment.