Skip to content

Commit

Permalink
Add DataColumnSidecar gossip topic and message handling (#6147)
Browse files Browse the repository at this point in the history
* Add `DataColumnSidecar` gossip topic and verification (#5050 and #5783).

* Remove gossip verification changes (#5783).

* Merge branch 'unstable' into data-column-gossip

# Conflicts:
#	beacon_node/beacon_chain/src/data_column_verification.rs
#	beacon_node/beacon_chain/src/lib.rs

* Add gossip cache timeout for data columns. Rename data column metrics for consistency.

* Remove usage of `unimplemented!` and address review comments.

* Remove unnused `GossipDataColumnError` variants and address review comments.

* Merge branch 'unstable' into data-column-gossip

* Update Cargo.lock

* Arc `ChainSpec` in discovery to avoid performance regression when needing to clone it repeatedly.
  • Loading branch information
jimmygchen authored Jul 25, 2024
1 parent a2ab26c commit 4e5a363
Show file tree
Hide file tree
Showing 26 changed files with 907 additions and 31 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 87 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
use crate::errors::{BeaconChainError as Error, BlockProductionError};
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
Expand Down Expand Up @@ -2118,6 +2119,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

pub fn verify_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
subnet_id: u64,
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError> {
metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS);
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| {
metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES);
v
})
}

pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
Expand Down Expand Up @@ -2964,6 +2978,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_gossip_data_columns(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns
.iter()
.map(|c| c.block_root())
.unique()
.exactly_one()
else {
return Err(BlockError::InternalError(
"Columns should be from the same block".to_string(),
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

let r = self
.check_gossip_data_columns_availability_and_import(data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
pub async fn process_rpc_blobs(
Expand Down Expand Up @@ -3013,6 +3060,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
r
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified_custody_columns(
&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
self.reqresp_pre_import_cache.write().remove(block_root);
}
r
}

/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
/// and evict if the block was imported or errored.
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
Expand Down Expand Up @@ -3257,6 +3319,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided data column can make any cached blocks available, and imports immediately
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
for data_colum in &data_columns {
slasher.accept_block_header(data_colum.signed_block_header());
}
}

let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;

self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
Expand Down
39 changes: 39 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::block_verification_types::{
AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock,
};
use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock};
use crate::data_column_verification::GossipDataColumnError;
use crate::eth1_finalization_cache::Eth1FinalizationData;
use crate::execution_payload::{
is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block,
Expand Down Expand Up @@ -303,6 +304,13 @@ pub enum BlockError<E: EthSpec> {
/// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob.
/// https://github.com/sigp/lighthouse/issues/4546
AvailabilityCheck(AvailabilityCheckError),
/// An internal error has occurred when processing the block or sidecars.
///
/// ## Peer scoring
///
/// We were unable to process this block due to an internal error. It's unclear if the block is
/// valid.
InternalError(String),
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockError<E> {
Expand Down Expand Up @@ -523,6 +531,20 @@ impl<E: EthSpec> BlockSlashInfo<GossipBlobError<E>> {
}
}

impl BlockSlashInfo<GossipDataColumnError> {
pub fn from_early_error_data_column(
header: SignedBeaconBlockHeader,
e: GossipDataColumnError,
) -> Self {
match e {
GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e),
// `InvalidSignature` could indicate any signature in the block, so we want
// to recheck the proposer signature alone.
_ => BlockSlashInfo::SignatureNotChecked(header, e),
}
}
}

/// Process invalid blocks to see if they are suitable for the slasher.
///
/// If no slasher is configured, this is a no-op.
Expand Down Expand Up @@ -2007,6 +2029,23 @@ impl<E: EthSpec> BlockBlobError for GossipBlobError<E> {
}
}

impl BlockBlobError for GossipDataColumnError {
fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self {
GossipDataColumnError::IsNotLaterThanParent {
data_column_slot,
parent_slot,
}
}

fn unknown_validator_error(validator_index: u64) -> Self {
GossipDataColumnError::UnknownValidator(validator_index)
}

fn proposer_signature_invalid() -> Self {
GossipDataColumnError::ProposalSignatureInvalid
}
}

/// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for
/// `slot` can be obtained from `state`.
///
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::GossipVerifiedDataColumn;
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

Expand Down Expand Up @@ -188,6 +189,14 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
)
}

pub fn put_gossip_data_columns(
&self,
_gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das) to be implemented
Err(AvailabilityCheckError::Unexpected)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
/// about whether all components have been received or more are required.
pub fn put_pending_executed_block(
Expand Down
Loading

0 comments on commit 4e5a363

Please sign in to comment.