Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataColumnSidecarsByRoot req/resp protocol #5196

Merged
merged 10 commits into from
Mar 4, 2024
95 changes: 78 additions & 17 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_verification::{
GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar,
};
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
Expand All @@ -25,6 +23,7 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -124,6 +123,7 @@ use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::payload::BlockProductionVersion;
use types::*;

Expand Down Expand Up @@ -1176,6 +1176,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_or_else(|| self.get_blobs(block_root), Ok)
}

pub fn get_data_columns_checking_early_attester_cache(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
self.early_attester_cache
.get_data_columns(*block_root)
.map_or_else(|| self.get_data_columns(block_root), Ok)
}

/// Returns the block at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -1251,6 +1260,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

/// Returns the data columns at the given root, if any.
///
/// ## Errors
/// May return a database error.
pub fn get_data_columns(
&self,
block_root: &Hash256,
) -> Result<DataColumnSidecarList<T::EthSpec>, Error> {
match self.store.get_data_columns(block_root)? {
Some(data_columns) => Ok(data_columns),
None => Ok(DataColumnSidecarList::default()),
}
}

pub fn get_blinded_block(
&self,
block_root: &Hash256,
Expand Down Expand Up @@ -2088,10 +2111,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError<T::EthSpec>> {
metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| {
GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES);
v
})
Expand Down Expand Up @@ -2912,18 +2935,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

pub fn process_gossip_data_column(
/// Cache the data column in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_column(
self: &Arc<Self>,
gossip_verified_data_column: GossipVerifiedDataColumnSidecar<T>,
) {
let data_column = gossip_verified_data_column.as_data_column();
// TODO(das) send to DA checker
info!(
self.log,
"Processed gossip data column";
"index" => data_column.index,
"slot" => data_column.slot().as_u64()
);
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let block_root = data_column.block_root();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown);
}

let r = self
.check_gossip_data_column_availability_and_import(data_column)
.await;
self.remove_notified(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3198,6 +3231,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_column_availability_and_import(
self: &Arc<Self>,
data_column: GossipVerifiedDataColumn<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let slot = data_column.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(data_column.signed_block_header());
}
let availability = self
.data_availability_checker
.put_gossip_data_column(data_column)?;

self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
Expand Down Expand Up @@ -3475,7 +3525,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs) = signed_block.deconstruct();
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let block = signed_block.message();
ops.extend(
confirmed_state_roots
Expand All @@ -3496,6 +3546,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(data_columns) = data_columns {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down
41 changes: 1 addition & 40 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use ssz_types::VariableList;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256,
SignedBeaconBlockHeader, Slot,
BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot,
};

/// An error occurred while validating a gossip blob.
Expand Down Expand Up @@ -185,33 +184,6 @@ pub type GossipVerifiedBlobList<T> = VariableList<
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
>;

#[derive(Debug)]
pub struct GossipVerifiedDataColumnSidecar<T: BeaconChainTypes> {
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
}

impl<T: BeaconChainTypes> GossipVerifiedDataColumnSidecar<T> {
pub fn new(
column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
chain: &BeaconChain<T>,
) -> Result<Self, GossipBlobError<T::EthSpec>> {
let header = column_sidecar.signed_block_header.clone();
// We only process slashing info if the gossip verification failed
// since we do not process the blob any further in that case.
validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| {
process_block_slash_info::<_, GossipBlobError<T::EthSpec>>(
chain,
BlockSlashInfo::from_early_error_blob(header, e),
)
})
}

pub fn as_data_column(&self) -> &Arc<DataColumnSidecar<T::EthSpec>> {
&self.data_column_sidecar
}
}

/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
Expand Down Expand Up @@ -675,17 +647,6 @@ pub fn validate_blob_sidecar_for_gossip<T: BeaconChainTypes>(
})
}

pub fn validate_data_column_sidecar_for_gossip<T: BeaconChainTypes>(
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
_subnet: u64,
_chain: &BeaconChain<T>,
) -> Result<GossipVerifiedDataColumnSidecar<T>, GossipBlobError<T::EthSpec>> {
// TODO(das): validate kzg commitments, cell proofs etc
Ok(GossipVerifiedDataColumnSidecar {
data_column_sidecar: data_column_sidecar.clone(),
})
}

/// Returns the canonical root of the given `blob`.
///
/// Use this function to ensure that we report the blob hashing time Prometheus metric.
Expand Down
32 changes: 32 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::block_verification_types::{
AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock,
};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_column_verification::GossipDataColumnError;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
Expand Down Expand Up @@ -528,6 +529,20 @@ impl<E: EthSpec> BlockSlashInfo<GossipBlobError<E>> {
}
}

impl<E: EthSpec> BlockSlashInfo<GossipDataColumnError<E>> {
pub fn from_early_error_data_column(
header: SignedBeaconBlockHeader,
e: GossipDataColumnError<E>,
) -> Self {
match e {
GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e),
// `InvalidSignature` could indicate any signature in the block, so we want
// to recheck the proposer signature alone.
_ => BlockSlashInfo::SignatureNotChecked(header, e),
}
}
}

/// Process invalid blocks to see if they are suitable for the slasher.
///
/// If no slasher is configured, this is a no-op.
Expand Down Expand Up @@ -2002,6 +2017,23 @@ impl<E: EthSpec> BlockBlobError for GossipBlobError<E> {
}
}

impl<E: EthSpec> BlockBlobError for GossipDataColumnError<E> {
fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self {
GossipDataColumnError::DataColumnIsNotLaterThanParent {
data_column_slot,
parent_slot,
}
}

fn unknown_validator_error(validator_index: u64) -> Self {
GossipDataColumnError::UnknownValidator(validator_index)
}

fn proposer_signature_invalid() -> Self {
GossipDataColumnError::ProposalSignatureInvalid
}
}

/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for
/// `slot` can be obtained from `state`.
///
Expand Down
35 changes: 28 additions & 7 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::sync::Arc;
use types::blob_sidecar::{BlobIdentifier, BlobSidecarError, FixedBlobSidecarList};
use types::data_column_sidecar::DataColumnSidecarList;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256,
SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
Expand Down Expand Up @@ -43,20 +44,23 @@ impl<E: EthSpec> RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
}
}

pub fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
}
}

pub fn blobs(&self) -> Option<&BlobSidecarList<E>> {
match &self.block {
RpcBlockInner::Block(_) => None,
RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs),
RpcBlockInner::BlockAndDataColumns(_, _) => None,
}
}
}
Expand All @@ -72,6 +76,9 @@ enum RpcBlockInner<E: EthSpec> {
/// This variant is used with parent lookups and by-range responses. It should have all blobs
/// ordered, all block roots matching, and the correct number of blobs for this block.
BlockAndBlobs(Arc<SignedBeaconBlock<E>>, BlobSidecarList<E>),
/// This variant is used with parent lookups and by-range responses. It should have all data columns
/// ordered, all block roots matching, and the correct number of data columns for this block.
BlockAndDataColumns(Arc<SignedBeaconBlock<E>>, DataColumnSidecarList<E>),
}

impl<E: EthSpec> RpcBlock<E> {
Expand Down Expand Up @@ -141,25 +148,36 @@ impl<E: EthSpec> RpcBlock<E> {
Self::new(Some(block_root), block, blobs)
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
let block_root = self.block_root();
match self.block {
RpcBlockInner::Block(block) => (block_root, block, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs)),
RpcBlockInner::Block(block) => (block_root, block, None, None),
RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None),
RpcBlockInner::BlockAndDataColumns(block, data_columns) => {
(block_root, block, None, Some(data_columns))
}
}
}
pub fn n_blobs(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) => 0,
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think match statements are executed in order from left to right. There might be a weird edge case here where both data columns and blobs are avail, but we return 0. If we're trying to support both blobs and data_columns initially, this edge case has the potential to cause issues

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you're right - perhaps we can add the BlockAndBlobsAndDataColumns variant to the other arm?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this type is used for RPC responses, so i don't think blobs and data column will coexist - we either request for block + blobs or block + data columns during lookups and sync, depending on the fork we're at.

RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(),
}
}
pub fn n_data_columns(&self) -> usize {
match &self.block {
RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0,
RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(),
}
}
}

/// A block that has gone through all pre-deneb block processing checks including block processing
Expand Down Expand Up @@ -485,12 +503,13 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let (block_root, block, blobs_opt) = self.deconstruct();
let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
let inner = match blobs_opt {
None => RpcBlockInner::Block(block),
Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs),
let inner = match (blobs_opt, data_columns_opt) {
(None, None) => RpcBlockInner::Block(block),
(Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs),
(_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns),
Comment on lines +509 to +512
Copy link
Collaborator

@eserilev eserilev Feb 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens here if we have both blobs and data_columns available? I guess we'd return a BlockAndBlobs variant? Would that imply that if blobs are avail, we'll just skip the data sampling checks?

would a BlockAndBlobsAndDataColumns variant allow us to support both blobs and data_columns until were ready to migrate away from blob subnets completely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so originally I was thinking to prioritise blobs, but then it's kind of pointless because we aren't executing the data column code paths.

I've been thinking about this and talking to Sean about this as well. A potentially way to introduce this and test it out may be add column subnets and sampling (without making it mandatory for is_data_avaialble, perhaps log warnings if it fails), and keep the existing blob subnets and DA filter logic - so perhaps an intermediate BlockAndBlobsAndDataColumns variant may make sense until we fully switch over to 1D PeerDAS.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you

match self {
  RpcBlockInner::*=> RpcBlock
}

so the enum type guarantees that blobs and columns do not exist at the same time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm understanding correctly but self is actually a struct type which contains optional blobs and columns:

pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an RpcBlock, we will have either block + blobs or block + samples if we've transitioned to DAS.

};
RpcBlock {
block_root,
Expand Down Expand Up @@ -522,12 +541,14 @@ impl<E: EthSpec> AsBlock<E> for RpcBlock<E> {
match &self.block {
RpcBlockInner::Block(block) => block,
RpcBlockInner::BlockAndBlobs(block, _) => block,
RpcBlockInner::BlockAndDataColumns(block, _) => block,
}
}
fn block_cloned(&self) -> Arc<SignedBeaconBlock<E>> {
match &self.block {
RpcBlockInner::Block(block) => block.clone(),
RpcBlockInner::BlockAndBlobs(block, _) => block.clone(),
RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(),
}
}
fn canonical_root(&self) -> Hash256 {
Expand Down
Loading
Loading