Skip to content

Commit

Permalink
PoC dedicated process X errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Apr 30, 2024
1 parent c8ffafb commit b0797fa
Show file tree
Hide file tree
Showing 9 changed files with 216 additions and 111 deletions.
91 changes: 53 additions & 38 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy,
signature_verify_chain_segment, verify_header_signature, BlockError, ExecutionPendingBlock,
check_block_relevancy, signature_verify_chain_segment, verify_header_signature,
BlobProcessError, BlockError, BlockImportError, BlockProcessError, ExecutionPendingBlock,
GossipVerifiedBlock, IntoExecutionPendingBlock,
};
use crate::block_verification_types::{
Expand Down Expand Up @@ -2778,18 +2778,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
}
Err(BlockError::BlockIsAlreadyKnown(block_root)) => {
Err(BlockProcessError::BlockError(BlockError::BlockIsAlreadyKnown(
block_root,
))) => {
debug!(self.log,
"Ignoring already known blocks while processing chain segment";
"block_root" => ?block_root);
continue;
}
Err(error) => {
Err(BlockProcessError::BlockError(error)) => {
return ChainSegmentResult::Failed {
imported_blocks,
error,
};
}
Err(other) => {
todo!();
}
}
}
}
Expand Down Expand Up @@ -2858,7 +2863,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlobProcessError> {
let block_root = blob.block_root();

// If this block has already been imported to forkchoice it must have been available, so
Expand All @@ -2868,7 +2873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(blob.block_root()));
return Err(BlobProcessError::AlreadyImported(blob.block_root()));
}

if let Some(event_handler) = self.event_handler.as_ref() {
Expand All @@ -2890,15 +2895,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlobProcessError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
return Err(BlobProcessError::AlreadyImported(block_root));
}

if let Some(event_handler) = self.event_handler.as_ref() {
Expand All @@ -2919,11 +2924,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
fn remove_notified<E>(
&self,
block_root: &Hash256,
r: Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
r: Result<AvailabilityProcessingStatus, E>,
) -> Result<AvailabilityProcessingStatus, E> {
let has_missing_components =
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
if !has_missing_components {
Expand All @@ -2939,7 +2944,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlockProcessError<T::EthSpec>> {
self.reqresp_pre_import_cache
.write()
.insert(block_root, unverified_block.block_cloned());
Expand Down Expand Up @@ -2971,7 +2976,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
unverified_block: B,
notify_execution_layer: NotifyExecutionLayer,
publish_fn: impl FnOnce() -> Result<(), BlockError<T::EthSpec>> + Send + 'static,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlockProcessError<T::EthSpec>> {
// Start the Prometheus timer.
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);

Expand Down Expand Up @@ -3012,9 +3017,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::Available(block) => self
.import_available_block(Box::new(block))
.await
.map_err(|e| e.into()),
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
}
Expand Down Expand Up @@ -3047,7 +3053,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

Ok(status)
}
Err(e @ BlockError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
Err(e @ BlockProcessError::BeaconChainError(BeaconChainError::TokioJoin(_))) => {
debug!(
self.log,
"Beacon block processing cancelled";
Expand All @@ -3057,20 +3063,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
// There was an error whilst attempting to verify and import the block. The block might
// be partially verified or partially imported.
Err(BlockError::BeaconChainError(e)) => {
Err(BlockProcessError::BeaconChainError(e)) => {
crit!(
self.log,
"Beacon block processing error";
"error" => ?e,
);
Err(BlockError::BeaconChainError(e))
Err(BlockProcessError::BeaconChainError(e))
}
// The block failed verification.
Err(other) => {
debug!(
self.log,
"Beacon block rejected";
"reason" => other.to_string(),
"reason" => ?other, // TODO
);
Err(other)
}
Expand Down Expand Up @@ -3139,27 +3145,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlockProcessError<T::EthSpec>> {
let slot = block.block.slot();
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability).await
self.process_availability(slot, availability)
.await
.map_err(|e| e.into())
}

/// Checks if the provided blob can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_gossip_blob_availability_and_import(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlobProcessError> {
let slot = blob.slot();
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(blob.signed_block_header());
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;

self.process_availability(slot, availability).await
self.process_availability(slot, availability)
.await
.map_err(|e| e.into())
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
Expand All @@ -3169,7 +3179,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlobProcessError> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
Expand All @@ -3186,7 +3196,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
.map_err(|e| BlobProcessError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
Expand All @@ -3197,7 +3207,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;

self.process_availability(slot, availability).await
self.process_availability(slot, availability)
.await
.map_err(|e| e.into())
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
Expand All @@ -3208,7 +3220,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlockImportError> {
match availability {
Availability::Available(block) => {
// Block is fully available, import into fork choice
Expand All @@ -3223,7 +3235,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
) -> Result<AvailabilityProcessingStatus, BlockImportError> {
let AvailableExecutedBlock {
block,
import_data,
Expand Down Expand Up @@ -3286,7 +3298,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
) -> Result<Hash256, BlockError<T::EthSpec>> {
) -> Result<Hash256, BlockImportError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
// being able to attest to it. DO NOT add any extra processing in this initial section
Expand Down Expand Up @@ -3331,8 +3343,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut fork_choice = self.canonical_head.fork_choice_write_lock();

// Do not import a block that doesn't descend from the finalized root.
let signed_block =
check_block_is_finalized_checkpoint_or_descendant(self, &fork_choice, signed_block)?;
// Note: fork_choice.on_block() below checks that parent_root is known and is descendant of
// finalized checkpoint root. Block validation already checks that parent is known before
// importing the block. There maybe a race condition if we advance our finalized checkpoint
// between validating the block and importing it. In that case this block is useless and we
// can consider the failure internal i.e. a `BeaconChainError`.
let block = signed_block.message();

// Register the new block with the fork choice service.
Expand All @@ -3354,7 +3369,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
payload_verification_status,
&self.spec,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
.map_err(|e| BlockImportError::BeaconChainError(e.into()))?;
}

// If the block is recent enough and it was not optimistically imported, check to see if it
Expand Down Expand Up @@ -3503,10 +3518,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"error" => ?e,
"warning" => "The database is likely corrupt now, consider --purge-db"
);
return Err(BlockError::BeaconChainError(e));
return Err(BlockImportError::BeaconChainError(e));
}

return Err(e.into());
return Err(BlockImportError::BeaconChainError(e.into()));
}
drop(txn_lock);

Expand Down Expand Up @@ -3580,7 +3595,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: BeaconBlockRef<T::EthSpec>,
block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
) -> Result<(), BlockImportError> {
// Only perform the weak subjectivity check if it was configured.
let Some(wss_checkpoint) = self.config.weak_subjectivity_checkpoint else {
return Ok(());
Expand Down Expand Up @@ -3625,11 +3640,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Provided block root is not a checkpoint.",
))
.map_err(|err| {
BlockError::BeaconChainError(
BlockImportError::BeaconChainError(
BeaconChainError::WeakSubjectivtyShutdownError(err),
)
})?;
return Err(BlockError::WeakSubjectivityConflict);
return Err(BlockImportError::WeakSubjectivityConflict);
}
}
Ok(())
Expand Down
86 changes: 86 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,92 @@ const MAXIMUM_BLOCK_SLOT_NUMBER: u64 = 4_294_967_296; // 2^32
/// Only useful for testing.
const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");

#[derive(Debug)]
pub enum BlobProcessError {
BeaconChainError(BeaconChainError),
ImportError(BlockImportError),
AvailabilityCheckError(AvailabilityCheckError),
AlreadyImported(Hash256),
}

impl From<BeaconChainError> for BlobProcessError {
fn from(e: BeaconChainError) -> Self {
Self::BeaconChainError(e)
}
}

impl From<BlockImportError> for BlobProcessError {
fn from(e: BlockImportError) -> Self {
Self::ImportError(e)
}
}

impl From<AvailabilityCheckError> for BlobProcessError {
fn from(e: AvailabilityCheckError) -> Self {
Self::AvailabilityCheckError(e)
}
}

#[derive(Debug)]
pub enum BlockProcessError<E: EthSpec> {
BlockError(BlockError<E>),
BeaconChainError(BeaconChainError),
ImportError(BlockImportError),
AvailabilityCheckError(AvailabilityCheckError),
}

impl<E: EthSpec> From<BlockError<E>> for BlockProcessError<E> {
fn from(e: BlockError<E>) -> Self {
match e {
BlockError::BeaconChainError(e) => Self::BeaconChainError(e),
e => Self::BlockError(e),
}
}
}

impl<E: EthSpec> From<BeaconChainError> for BlockProcessError<E> {
fn from(e: BeaconChainError) -> Self {
Self::BeaconChainError(e)
}
}

impl<E: EthSpec> From<BlockImportError> for BlockProcessError<E> {
fn from(e: BlockImportError) -> Self {
Self::ImportError(e)
}
}

impl<E: EthSpec> From<AvailabilityCheckError> for BlockProcessError<E> {
fn from(e: AvailabilityCheckError) -> Self {
Self::AvailabilityCheckError(e)
}
}

#[derive(Debug)]
pub enum BlockImportError {
/// There was an error whilst processing the block. It is not necessarily invalid.
///
/// ## Peer scoring
///
/// We were unable to process this block due to an internal error. It's unclear if the block is
/// valid.
BeaconChainError(BeaconChainError),
/// There was an error whilst verifying weak subjectivity. This block conflicts with the
/// configured weak subjectivity checkpoint and was not imported.
///
/// ## Peer scoring
///
/// The block is invalid and the peer is faulty.
WeakSubjectivityConflict,
AvailabilityCheckError(AvailabilityCheckError),
}

impl From<BeaconChainError> for BlockImportError {
fn from(e: BeaconChainError) -> Self {
BlockImportError::BeaconChainError(e)
}
}

/// Returned when a block was not verified. A block is not verified for two reasons:
///
/// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`.
Expand Down
Loading

0 comments on commit b0797fa

Please sign in to comment.