From 279270533128ef8069765d04c280db993db12905 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Tue, 24 Sep 2024 14:52:44 +1000 Subject: [PATCH] Lenient duplicate checks on HTTP API for block publication (#5574) * start splitting gossip verification * WIP * Gossip verify separate (#7) * save * save * make ProvenancedBlock concrete * delete into gossip verified block contents * get rid of IntoBlobSidecar trait * remove IntoGossipVerified trait * get tests compiling * don't check sidecar slashability in publish * remove second publish closure * drop blob bool. also prefer using message index over index of position in list * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Fix low-hanging tests * Fix tests and clean up * Clean up imports * more cleanup * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Further refine behaviour and add tests * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Remove empty line * Fix test (block is not fully imported just gossip verified) * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Update for unstable & use empty blob list * Update comment * Add test for duplicate block case * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Clarify unreachable case * Fix another publish_block case * Remove unreachable case in filter chain segment * Revert unrelated blob optimisation * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Fix merge conflicts * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Fix some compilation issues. Impl is fucked though * Support peerDAS * Fix tests * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Fix conflict * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate * Address review comments * Merge remote-tracking branch 'origin/unstable' into gossip-verify-separate --- beacon_node/beacon_chain/src/beacon_chain.rs | 47 +- .../beacon_chain/src/block_verification.rs | 155 ++--- .../src/block_verification_types.rs | 72 +- beacon_node/beacon_chain/src/lib.rs | 6 +- beacon_node/beacon_chain/src/test_utils.rs | 4 +- .../beacon_chain/tests/block_verification.rs | 8 +- beacon_node/beacon_chain/tests/events.rs | 2 +- beacon_node/http_api/src/lib.rs | 8 +- beacon_node/http_api/src/publish_blocks.rs | 657 ++++++++++++------ beacon_node/http_api/src/test_utils.rs | 20 +- .../tests/broadcast_validation_tests.rs | 507 ++++++++++++-- beacon_node/http_api/tests/fork_tests.rs | 1 + .../http_api/tests/interactive_tests.rs | 2 + .../gossip_methods.rs | 16 +- .../network_beacon_processor/sync_methods.rs | 7 +- .../network/src/sync/block_lookups/mod.rs | 12 +- .../network/src/sync/block_lookups/tests.rs | 4 +- .../network/src/sync/network_context.rs | 4 +- common/eth2/src/types.rs | 36 - consensus/types/src/data_column_sidecar.rs | 1 + testing/ef_tests/src/cases/fork_choice.rs | 4 +- 21 files changed, 1064 insertions(+), 509 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 515b65b1af7..5d287e2b68a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2740,7 +2740,10 @@ impl BeaconChain { // If the block is relevant, add it to the filtered chain segment. Ok(_) => filtered_chain_segment.push((block_root, block)), // If the block is already known, simply ignore this block. - Err(BlockError::BlockIsAlreadyKnown(_)) => continue, + // + // Note that `check_block_relevancy` is incapable of returning + // `DuplicateImportStatusUnknown` so we don't need to handle that case here. + Err(BlockError::DuplicateFullyImported(_)) => continue, // If the block is the genesis block, simply ignore this block. Err(BlockError::GenesisBlock) => continue, // If the block is is for a finalized slot, simply ignore this block. @@ -2886,7 +2889,7 @@ impl BeaconChain { } } } - Err(BlockError::BlockIsAlreadyKnown(block_root)) => { + Err(BlockError::DuplicateFullyImported(block_root)) => { debug!(self.log, "Ignoring already known blocks while processing chain segment"; "block_root" => ?block_root); @@ -2977,6 +2980,7 @@ impl BeaconChain { pub async fn process_gossip_blob( self: &Arc, blob: GossipVerifiedBlob, + publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { let block_root = blob.block_root(); @@ -2987,7 +2991,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::BlockIsAlreadyKnown(blob.block_root())); + return Err(BlockError::DuplicateFullyImported(blob.block_root())); } // No need to process and import blobs beyond the PeerDAS epoch. @@ -3003,7 +3007,9 @@ impl BeaconChain { } } - let r = self.check_gossip_blob_availability_and_import(blob).await; + let r = self + .check_gossip_blob_availability_and_import(blob, publish_fn) + .await; self.remove_notified(&block_root, r) } @@ -3012,6 +3018,7 @@ impl BeaconChain { pub async fn process_gossip_data_columns( self: &Arc, data_columns: Vec>, + publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result< ( AvailabilityProcessingStatus, @@ -3037,11 +3044,16 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::BlockIsAlreadyKnown(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root)); } let r = self - .check_gossip_data_columns_availability_and_import(slot, block_root, data_columns) + .check_gossip_data_columns_availability_and_import( + slot, + block_root, + data_columns, + publish_fn, + ) .await; self.remove_notified_custody_columns(&block_root, r) } @@ -3061,7 +3073,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::BlockIsAlreadyKnown(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root)); } // Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data @@ -3127,7 +3139,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::BlockIsAlreadyKnown(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root)); } // Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data @@ -3225,7 +3237,7 @@ impl BeaconChain { unverified_block: B, notify_execution_layer: NotifyExecutionLayer, block_source: BlockImportSource, - publish_fn: impl FnOnce() -> Result<(), BlockError> + Send + 'static, + publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { // Start the Prometheus timer. let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES); @@ -3407,7 +3419,8 @@ impl BeaconChain { let availability = self .data_availability_checker .put_pending_executed_block(block)?; - self.process_availability(slot, availability).await + self.process_availability(slot, availability, || Ok(())) + .await } /// Checks if the provided blob can make any cached blocks available, and imports immediately @@ -3415,6 +3428,7 @@ impl BeaconChain { async fn check_gossip_blob_availability_and_import( self: &Arc, blob: GossipVerifiedBlob, + publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { let slot = blob.slot(); if let Some(slasher) = self.slasher.as_ref() { @@ -3422,7 +3436,8 @@ impl BeaconChain { } let availability = self.data_availability_checker.put_gossip_blob(blob)?; - self.process_availability(slot, availability).await + self.process_availability(slot, availability, publish_fn) + .await } /// Checks if the provided data column can make any cached blocks available, and imports immediately @@ -3432,6 +3447,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, data_columns: Vec>, + publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result< ( AvailabilityProcessingStatus, @@ -3449,7 +3465,7 @@ impl BeaconChain { .data_availability_checker .put_gossip_data_columns(slot, block_root, data_columns)?; - self.process_availability(slot, availability) + self.process_availability(slot, availability, publish_fn) .await .map(|result| (result, data_columns_to_publish)) } @@ -3490,7 +3506,8 @@ impl BeaconChain { .data_availability_checker .put_rpc_blobs(block_root, epoch, blobs)?; - self.process_availability(slot, availability).await + self.process_availability(slot, availability, || Ok(())) + .await } /// Checks if the provided columns can make any cached blocks available, and imports immediately @@ -3538,7 +3555,7 @@ impl BeaconChain { custody_columns, )?; - self.process_availability(slot, availability) + self.process_availability(slot, availability, || Ok(())) .await .map(|result| (result, data_columns_to_publish)) } @@ -3551,9 +3568,11 @@ impl BeaconChain { self: &Arc, slot: Slot, availability: Availability, + publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { Availability::Available(block) => { + publish_fn()?; // Block is fully available, import into fork choice self.import_available_block(block).await } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 027c013a497..a8233f170f6 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -49,14 +49,10 @@ #![allow(clippy::result_large_err)] use crate::beacon_snapshot::PreProcessingSnapshot; -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob, GossipVerifiedBlobList}; -use crate::block_verification_types::{ - AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock, -}; +use crate::blob_verification::GossipBlobError; +use crate::block_verification_types::{AsBlock, BlockImportData, RpcBlock}; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; -use crate::data_column_verification::{ - GossipDataColumnError, GossipVerifiedDataColumn, GossipVerifiedDataColumnList, -}; +use crate::data_column_verification::GossipDataColumnError; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -71,7 +67,7 @@ use crate::{ metrics, BeaconChain, BeaconChainError, BeaconChainTypes, }; use derivative::Derivative; -use eth2::types::{BlockGossip, EventKind, PublishBlockRequest}; +use eth2::types::{BlockGossip, EventKind}; use execution_layer::PayloadStatus; pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus}; use lighthouse_metrics::TryExt; @@ -82,7 +78,6 @@ use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; use ssz::Encode; use ssz_derive::{Decode, Encode}; -use ssz_types::VariableList; use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block}; use state_processing::{ block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError}, @@ -98,14 +93,12 @@ use std::io::Write; use std::sync::Arc; use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp}; use task_executor::JoinHandle; -use types::data_column_sidecar::DataColumnSidecarError; use types::{ - BeaconBlockRef, BeaconState, BeaconStateError, BlobsList, ChainSpec, DataColumnSubnetId, Epoch, - EthSpec, ExecutionBlockHash, FullPayload, Hash256, InconsistentFork, KzgProofs, PublicKey, - PublicKeyBytes, RelativeEpoch, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, - Slot, + data_column_sidecar::DataColumnSidecarError, BeaconBlockRef, BeaconState, BeaconStateError, + BlobsList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, ExecPayload, ExecutionBlockHash, + FullPayload, Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, + SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; -use types::{BlobSidecar, ExecPayload}; pub const POS_PANDA_BANNER: &str = r#" ,,, ,,, ,,, ,,, @@ -187,12 +180,18 @@ pub enum BlockError { /// It's unclear if this block is valid, but it conflicts with finality and shouldn't be /// imported. NotFinalizedDescendant { block_parent_root: Hash256 }, - /// Block is already known, no need to re-import. + /// Block is already known and valid, no need to re-import. /// /// ## Peer scoring /// /// The block is valid and we have already imported a block with this hash. - BlockIsAlreadyKnown(Hash256), + DuplicateFullyImported(Hash256), + /// Block has already been seen on gossip but has not necessarily finished being imported. + /// + /// ## Peer scoring + /// + /// The block could be valid, or invalid. We don't know. + DuplicateImportStatusUnknown(Hash256), /// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER. /// /// ## Peer scoring @@ -704,115 +703,57 @@ pub struct ExecutionPendingBlock { pub payload_verification_handle: PayloadVerificationHandle, } -pub trait IntoGossipVerifiedBlockContents: Sized { +pub trait IntoGossipVerifiedBlock: Sized { fn into_gossip_verified_block( self, chain: &BeaconChain, - ) -> Result, BlockContentsError>; - fn inner_block(&self) -> &SignedBeaconBlock; + ) -> Result, BlockError>; + fn inner_block(&self) -> Arc>; } -impl IntoGossipVerifiedBlockContents for GossipVerifiedBlockContents { +impl IntoGossipVerifiedBlock for GossipVerifiedBlock { fn into_gossip_verified_block( self, _chain: &BeaconChain, - ) -> Result, BlockContentsError> { + ) -> Result, BlockError> { Ok(self) } - fn inner_block(&self) -> &SignedBeaconBlock { - self.0.block.as_block() + fn inner_block(&self) -> Arc> { + self.block_cloned() } } -impl IntoGossipVerifiedBlockContents for PublishBlockRequest { +impl IntoGossipVerifiedBlock for Arc> { fn into_gossip_verified_block( self, chain: &BeaconChain, - ) -> Result, BlockContentsError> { - let (block, blobs) = self.deconstruct(); - let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); - - let (gossip_verified_blobs, gossip_verified_data_columns) = if peer_das_enabled { - let gossip_verified_data_columns = - build_gossip_verified_data_columns(chain, &block, blobs.map(|(_, blobs)| blobs))?; - (None, gossip_verified_data_columns) - } else { - let gossip_verified_blobs = build_gossip_verified_blobs(chain, &block, blobs)?; - (gossip_verified_blobs, None) - }; - - let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?; - - Ok(( - gossip_verified_block, - gossip_verified_blobs, - gossip_verified_data_columns, - )) + ) -> Result, BlockError> { + GossipVerifiedBlock::new(self, chain) } - fn inner_block(&self) -> &SignedBeaconBlock { - self.signed_block() + fn inner_block(&self) -> Arc> { + self.clone() } } -#[allow(clippy::type_complexity)] -fn build_gossip_verified_blobs( - chain: &BeaconChain, - block: &Arc>>, - blobs: Option<(KzgProofs, BlobsList)>, -) -> Result>, BlockContentsError> { - blobs - .map(|(kzg_proofs, blobs)| { - let mut gossip_verified_blobs = vec![]; - for (i, (kzg_proof, blob)) in kzg_proofs.iter().zip(blobs).enumerate() { - let _timer = - metrics::start_timer(&metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION); - let blob = BlobSidecar::new(i, blob, block, *kzg_proof) - .map_err(BlockContentsError::BlobSidecarError)?; - drop(_timer); - let gossip_verified_blob = - GossipVerifiedBlob::new(Arc::new(blob), i as u64, chain)?; - gossip_verified_blobs.push(gossip_verified_blob); - } - let gossip_verified_blobs = VariableList::from(gossip_verified_blobs); - Ok::<_, BlockContentsError>(gossip_verified_blobs) - }) - .transpose() -} - -fn build_gossip_verified_data_columns( +pub fn build_blob_data_column_sidecars( chain: &BeaconChain, block: &SignedBeaconBlock>, - blobs: Option>, -) -> Result>, BlockContentsError> { - blobs - // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. - .filter(|b| !b.is_empty()) - .map(|blobs| { - let mut timer = metrics::start_timer_vec( - &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, - &[&blobs.len().to_string()], - ); - let sidecars = blobs_to_data_column_sidecars(&blobs, block, &chain.kzg, &chain.spec) - .discard_timer_on_break(&mut timer)?; - drop(timer); - let mut gossip_verified_data_columns = vec![]; - for sidecar in sidecars { - let subnet = DataColumnSubnetId::from_column_index::( - sidecar.index as usize, - &chain.spec, - ); - let column = GossipVerifiedDataColumn::new(sidecar, subnet.into(), chain)?; - gossip_verified_data_columns.push(column); - } - let gossip_verified_data_columns = RuntimeVariableList::new( - gossip_verified_data_columns, - chain.spec.number_of_columns, - ) - .map_err(DataColumnSidecarError::SszError)?; - Ok::<_, BlockContentsError>(gossip_verified_data_columns) - }) - .transpose() + blobs: BlobsList, +) -> Result, DataColumnSidecarError> { + // Only attempt to build data columns if blobs is non empty to avoid skewing the metrics. + if blobs.is_empty() { + return Ok(vec![]); + } + + let mut timer = metrics::start_timer_vec( + &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, + &[&blobs.len().to_string()], + ); + let sidecars = blobs_to_data_column_sidecars(&blobs, block, &chain.kzg, &chain.spec) + .discard_timer_on_break(&mut timer)?; + drop(timer); + Ok(sidecars) } /// Implemented on types that can be converted into a `ExecutionPendingBlock`. @@ -912,7 +853,7 @@ impl GossipVerifiedBlock { // already know this block. let fork_choice_read_lock = chain.canonical_head.fork_choice_read_lock(); if fork_choice_read_lock.contains_block(&block_root) { - return Err(BlockError::BlockIsAlreadyKnown(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root)); } // Do not process a block that doesn't descend from the finalized root. @@ -1046,7 +987,9 @@ impl GossipVerifiedBlock { SeenBlock::Slashable => { return Err(BlockError::Slashable); } - SeenBlock::Duplicate => return Err(BlockError::BlockIsAlreadyKnown(block_root)), + SeenBlock::Duplicate => { + return Err(BlockError::DuplicateImportStatusUnknown(block_root)) + } SeenBlock::UniqueNonSlashable => {} }; @@ -1894,7 +1837,7 @@ pub fn check_block_relevancy( .fork_choice_read_lock() .contains_block(&block_root) { - return Err(BlockError::BlockIsAlreadyKnown(block_root)); + return Err(BlockError::DuplicateFullyImported(block_root)); } Ok(block_root) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 707dfa56d84..420c83081c7 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -1,19 +1,14 @@ -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlobList}; -use crate::block_verification::BlockError; use crate::data_availability_checker::AvailabilityCheckError; pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; -use crate::data_column_verification::{ - CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList, -}; +use crate::data_column_verification::{CustodyDataColumn, CustodyDataColumnList}; use crate::eth1_finalization_cache::Eth1FinalizationData; -use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome}; +use crate::{get_block_root, PayloadVerificationOutcome}; use derivative::Derivative; use ssz_types::VariableList; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList}; -use types::data_column_sidecar::{self}; +use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, @@ -390,67 +385,6 @@ impl BlockImportData { } } -pub type GossipVerifiedBlockContents = ( - GossipVerifiedBlock, - Option>, - Option>, -); - -#[derive(Debug)] -pub enum BlockContentsError { - BlockError(BlockError), - BlobError(GossipBlobError), - BlobSidecarError(blob_sidecar::BlobSidecarError), - DataColumnError(GossipDataColumnError), - DataColumnSidecarError(data_column_sidecar::DataColumnSidecarError), -} - -impl From for BlockContentsError { - fn from(value: BlockError) -> Self { - Self::BlockError(value) - } -} - -impl From for BlockContentsError { - fn from(value: GossipBlobError) -> Self { - Self::BlobError(value) - } -} - -impl From for BlockContentsError { - fn from(value: GossipDataColumnError) -> Self { - Self::DataColumnError(value) - } -} - -impl From for BlockContentsError { - fn from(value: data_column_sidecar::DataColumnSidecarError) -> Self { - Self::DataColumnSidecarError(value) - } -} - -impl std::fmt::Display for BlockContentsError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - BlockContentsError::BlockError(err) => { - write!(f, "BlockError({})", err) - } - BlockContentsError::BlobError(err) => { - write!(f, "BlobError({})", err) - } - BlockContentsError::BlobSidecarError(err) => { - write!(f, "BlobSidecarError({:?})", err) - } - BlockContentsError::DataColumnError(err) => { - write!(f, "DataColumnError({:?})", err) - } - BlockContentsError::DataColumnSidecarError(err) => { - write!(f, "DataColumnSidecarError({:?})", err) - } - } - } -} - /// Trait for common block operations. pub trait AsBlock { fn slot(&self) -> Slot; diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 7bfb5b08beb..b89c00e0af4 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -75,9 +75,9 @@ pub use self::historical_blocks::HistoricalBlockError; pub use attestation_verification::Error as AttestationError; pub use beacon_fork_choice_store::{BeaconForkChoiceStore, Error as ForkChoiceStoreError}; pub use block_verification::{ - get_block_root, BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, - IntoExecutionPendingBlock, IntoGossipVerifiedBlockContents, PayloadVerificationOutcome, - PayloadVerificationStatus, + build_blob_data_column_sidecars, get_block_root, BlockError, ExecutionPayloadError, + ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, IntoGossipVerifiedBlock, + PayloadVerificationOutcome, PayloadVerificationStatus, }; pub use block_verification_types::AvailabilityPendingExecutedBlock; pub use block_verification_types::ExecutedBlock; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 582d20637b3..ce36c8ca216 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -878,7 +878,7 @@ where let proposer_index = state.get_beacon_proposer_index(slot, &self.spec).unwrap(); // If we produce two blocks for the same slot, they hash up to the same value and - // BeaconChain errors out with `BlockIsAlreadyKnown`. Vary the graffiti so that we produce + // BeaconChain errors out with `DuplicateFullyImported`. Vary the graffiti so that we produce // different blocks each time. let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); @@ -940,7 +940,7 @@ where let proposer_index = state.get_beacon_proposer_index(slot, &self.spec).unwrap(); // If we produce two blocks for the same slot, they hash up to the same value and - // BeaconChain errors out with `BlockIsAlreadyKnown`. Vary the graffiti so that we produce + // BeaconChain errors out with `DuplicateFullyImported`. Vary the graffiti so that we produce // different blocks each time. let graffiti = Graffiti::from(self.rng.lock().gen::<[u8; 32]>()); diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 535d63427a1..d239f5089aa 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -976,7 +976,7 @@ async fn block_gossip_verification() { harness .chain - .process_gossip_blob(gossip_verified) + .process_gossip_blob(gossip_verified, || Ok(())) .await .expect("should import valid gossip verified blob"); } @@ -1173,7 +1173,7 @@ async fn block_gossip_verification() { assert!( matches!( unwrap_err(harness.chain.verify_block_for_gossip(Arc::new(block.clone())).await), - BlockError::BlockIsAlreadyKnown(_), + BlockError::DuplicateImportStatusUnknown(_), ), "should register any valid signature against the proposer, even if the block failed later verification" ); @@ -1201,7 +1201,7 @@ async fn block_gossip_verification() { .verify_block_for_gossip(block.clone()) .await .expect_err("should error when processing known block"), - BlockError::BlockIsAlreadyKnown(_) + BlockError::DuplicateImportStatusUnknown(_) ), "the second proposal by this validator should be rejected" ); @@ -1247,7 +1247,7 @@ async fn verify_block_for_gossip_slashing_detection() { .unwrap(); harness .chain - .process_gossip_blob(verified_blob) + .process_gossip_blob(verified_blob, || Ok(())) .await .unwrap(); } diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index ab784d3be45..31e69f0524e 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -35,7 +35,7 @@ async fn blob_sidecar_event_on_process_gossip_blob() { let _ = harness .chain - .process_gossip_blob(gossip_verified_blob) + .process_gossip_blob(gossip_verified_blob, || Ok(())) .await .unwrap(); diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 15d463b661f..ffcfda46803 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1298,7 +1298,7 @@ pub fn serve( task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_block( None, - ProvenancedBlock::local(block_contents), + ProvenancedBlock::local_from_publish_request(block_contents), chain, &network_tx, log, @@ -1340,7 +1340,7 @@ pub fn serve( })?; publish_blocks::publish_block( None, - ProvenancedBlock::local(block_contents), + ProvenancedBlock::local_from_publish_request(block_contents), chain, &network_tx, log, @@ -1375,7 +1375,7 @@ pub fn serve( task_spawner.spawn_async_with_rejection(Priority::P0, async move { publish_blocks::publish_block( None, - ProvenancedBlock::local(block_contents), + ProvenancedBlock::local_from_publish_request(block_contents), chain, &network_tx, log, @@ -1419,7 +1419,7 @@ pub fn serve( })?; publish_blocks::publish_block( None, - ProvenancedBlock::local(block_contents), + ProvenancedBlock::local_from_publish_request(block_contents), chain, &network_tx, log, diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index ad7cb3081ea..16364b435a9 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,13 +1,17 @@ use crate::metrics; -use beacon_chain::block_verification_types::{AsBlock, BlockContentsError}; +use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use beacon_chain::validator_monitor::{get_block_delay_ms, timestamp_now}; use beacon_chain::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, - IntoGossipVerifiedBlockContents, NotifyExecutionLayer, + build_blob_data_column_sidecars, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, + BeaconChainTypes, BlockError, IntoGossipVerifiedBlock, NotifyExecutionLayer, +}; +use eth2::types::{ + BlobsBundle, BroadcastValidation, ErrorMessage, ExecutionPayloadAndBlobs, FullPayloadContents, + PublishBlockRequest, SignedBlockContents, }; -use eth2::types::{into_full_block_and_blobs, BroadcastValidation, ErrorMessage}; -use eth2::types::{FullPayloadContents, PublishBlockRequest}; use execution_layer::ProvenancedPayload; use lighthouse_network::{NetworkGlobals, PubsubMessage}; use network::NetworkMessage; @@ -15,39 +19,62 @@ use rand::seq::SliceRandom; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::marker::PhantomData; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BeaconBlockRef, BlobSidecarList, BlockImportSource, DataColumnSidecarList, - DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, - FullPayloadBellatrix, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, VariableList, + AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource, + DataColumnSidecarList, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, + FullPayload, FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock, + SignedBlindedBeaconBlock, }; use warp::http::StatusCode; use warp::{reply::Response, Rejection, Reply}; -pub enum ProvenancedBlock> { +pub type UnverifiedBlobs = Option<( + KzgProofs<::EthSpec>, + BlobsList<::EthSpec>, +)>; + +pub enum ProvenancedBlock> { /// The payload was built using a local EE. - Local(B, PhantomData), + Local(B, UnverifiedBlobs, PhantomData), /// The payload was build using a remote builder (e.g., via a mev-boost /// compatible relay). - Builder(B, PhantomData), + Builder(B, UnverifiedBlobs, PhantomData), } -impl> ProvenancedBlock { - pub fn local(block: B) -> Self { - Self::Local(block, PhantomData) +impl> ProvenancedBlock { + pub fn local(block: B, blobs: UnverifiedBlobs) -> Self { + Self::Local(block, blobs, PhantomData) } - pub fn builder(block: B) -> Self { - Self::Builder(block, PhantomData) + pub fn builder(block: B, blobs: UnverifiedBlobs) -> Self { + Self::Builder(block, blobs, PhantomData) + } +} + +impl ProvenancedBlock>> { + pub fn local_from_publish_request(request: PublishBlockRequest) -> Self { + match request { + PublishBlockRequest::Block(block) => Self::local(block, None), + PublishBlockRequest::BlockContents(block_contents) => { + let SignedBlockContents { + signed_block, + kzg_proofs, + blobs, + } = block_contents; + Self::local(signed_block, Some((kzg_proofs, blobs))) + } + } } } /// Handles a request from the HTTP API for full blocks. #[allow(clippy::too_many_arguments)] -pub async fn publish_block>( +pub async fn publish_block>( block_root: Option, provenanced_block: ProvenancedBlock, chain: Arc>, @@ -59,28 +86,29 @@ pub async fn publish_block Result { let seen_timestamp = timestamp_now(); - let (block_contents, is_locally_built_block) = match provenanced_block { - ProvenancedBlock::Local(block_contents, _) => (block_contents, true), - ProvenancedBlock::Builder(block_contents, _) => (block_contents, false), + let (unverified_block, unverified_blobs, is_locally_built_block) = match provenanced_block { + ProvenancedBlock::Local(block, blobs, _) => (block, blobs, true), + ProvenancedBlock::Builder(block, blobs, _) => (block, blobs, false), }; let provenance = if is_locally_built_block { "local" } else { "builder" }; - let block = block_contents.inner_block().clone(); - let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); + let block = unverified_block.inner_block(); debug!(log, "Signed block received in HTTP API"; "slot" => block.slot()); let malicious_withhold_count = chain.config.malicious_withhold_count; let chain_cloned = chain.clone(); /* actually publish a block */ - let publish_block = move |block: Arc>, - blobs_opt: Option>, - data_cols_opt: Option>, - sender, - log, - seen_timestamp| { + let publish_block_p2p = move |block: Arc>, + should_publish_block: bool, + blob_sidecars: Vec>>, + mut data_column_sidecars: DataColumnSidecarList, + sender, + log, + seen_timestamp| + -> Result<(), BlockError> { let publish_timestamp = timestamp_now(); let publish_delay = publish_timestamp .checked_sub(seen_timestamp) @@ -92,55 +120,48 @@ pub async fn publish_block block.slot(), - "publish_delay_ms" => publish_delay.as_millis() - ); + let mut pubsub_messages = if should_publish_block { + info!( + log, + "Signed block published to network via HTTP API"; + "slot" => block.slot(), + "blobs_published" => blob_sidecars.len(), + "publish_delay_ms" => publish_delay.as_millis(), + ); + vec![PubsubMessage::BeaconBlock(block.clone())] + } else { + vec![] + }; match block.as_ref() { SignedBeaconBlock::Base(_) | SignedBeaconBlock::Altair(_) | SignedBeaconBlock::Bellatrix(_) | SignedBeaconBlock::Capella(_) => { - crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block)) + crate::publish_pubsub_messages(&sender, pubsub_messages) .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; } SignedBeaconBlock::Deneb(_) | SignedBeaconBlock::Electra(_) => { - let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block)]; - if let Some(blob_sidecars) = blobs_opt { - // Publish blob sidecars - for (blob_index, blob) in blob_sidecars.into_iter().enumerate() { - pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new(( - blob_index as u64, - blob, - )))); - } + for blob in blob_sidecars.into_iter() { + pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((blob.index, blob)))); } - if let Some(data_col_sidecars) = data_cols_opt { - let mut data_col_sidecars = data_col_sidecars.to_vec(); - if malicious_withhold_count > 0 { - let columns_to_keep = data_col_sidecars - .len() - .saturating_sub(malicious_withhold_count); - // Randomize columns before dropping the last malicious_withhold_count items - data_col_sidecars.shuffle(&mut rand::thread_rng()); - data_col_sidecars = data_col_sidecars - .into_iter() - .take(columns_to_keep) - .collect::>(); - } - - for data_col in data_col_sidecars { - let subnet = DataColumnSubnetId::from_column_index::( - data_col.index as usize, - &chain_cloned.spec, - ); - pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new(( - subnet, data_col, - )))); - } + if malicious_withhold_count > 0 { + let columns_to_keep = data_column_sidecars + .len() + .saturating_sub(malicious_withhold_count); + // Randomize columns before dropping the last malicious_withhold_count items + data_column_sidecars.shuffle(&mut rand::thread_rng()); + drop(data_column_sidecars.drain(columns_to_keep..)); + } + + for data_col in data_column_sidecars { + let subnet = DataColumnSubnetId::from_column_index::( + data_col.index as usize, + &chain_cloned.spec, + ); + pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new(( + subnet, data_col, + )))); } crate::publish_pubsub_messages(&sender, pubsub_messages) .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; @@ -150,72 +171,162 @@ pub async fn publish_block b, - Err(BlockContentsError::BlockError(BlockError::BlockIsAlreadyKnown(_))) - | Err(BlockContentsError::BlobError( - beacon_chain::blob_verification::GossipBlobError::RepeatBlob { .. }, - )) => { - // Allow the status code for duplicate blocks to be overridden based on config. - return Ok(warp::reply::with_status( - warp::reply::json(&ErrorMessage { - code: duplicate_status_code.as_u16(), - message: "duplicate block".to_string(), - stacktraces: vec![], - }), - duplicate_status_code, - ) - .into_response()); - } - Err(e) => { - warn!( - log, - "Not publishing block - not gossip verified"; - "slot" => slot, - "error" => %e - ); - return Err(warp_utils::reject::custom_bad_request(e.to_string())); - } - }; - // TODO(das): We could potentially get rid of these conversions and pass `GossipVerified` types - // to `publish_block`, i.e. have `GossipVerified` types in `PubsubMessage`? - // This saves us from extra code and provides guarantee that published - // components are verified. - // Clone here, so we can take advantage of the `Arc`. The block in `BlockContents` is not, - // `Arc`'d but blobs are. - let block = gossip_verified_block.block.block_cloned(); - let blobs_opt = gossip_verified_blobs.as_ref().map(|gossip_verified_blobs| { - let blobs = gossip_verified_blobs - .into_iter() - .map(|b| b.clone_blob()) - .collect::>(); - VariableList::from(blobs) - }); - let data_cols_opt = gossip_verified_data_columns - .as_ref() - .map(|gossip_verified_data_columns| { - gossip_verified_data_columns + // Convert blobs to either: + // + // 1. Blob sidecars if prior to peer DAS, or + // 2. Data column sidecars if post peer DAS. + let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); + + let (blob_sidecars, data_column_sidecars) = match unverified_blobs { + // Pre-PeerDAS: construct blob sidecars for the network. + Some((kzg_proofs, blobs)) if !peer_das_enabled => { + let blob_sidecars = kzg_proofs .into_iter() - .map(|col| col.clone_data_column()) - .collect::>() - }); + .zip(blobs) + .enumerate() + .map(|(i, (proof, unverified_blob))| { + let _timer = metrics::start_timer( + &beacon_chain::metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION, + ); + let blob_sidecar = + BlobSidecar::new(i, unverified_blob, &block, proof).map(Arc::new); + blob_sidecar.map_err(|e| { + error!( + log, + "Invalid blob - not publishing block"; + "error" => ?e, + "blob_index" => i, + "slot" => slot, + ); + warp_utils::reject::custom_bad_request(format!("{e:?}")) + }) + }) + .collect::, Rejection>>()?; + (blob_sidecars, vec![]) + } + // Post PeerDAS: construct data columns. + Some((_, blobs)) => { + // TODO(das): this is sub-optimal and should likely not be happening prior to gossip + // block publishing. + let data_column_sidecars = build_blob_data_column_sidecars(&chain, &block, blobs) + .map_err(|e| { + error!( + log, + "Invalid data column - not publishing block"; + "error" => ?e, + "slot" => slot + ); + warp_utils::reject::custom_bad_request(format!("{e:?}")) + })?; + (vec![], data_column_sidecars) + } + None => (vec![], vec![]), + }; - let block_root = block_root.unwrap_or(gossip_verified_block.block_root); + // Gossip verify the block and blobs/data columns separately. + let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); + let gossip_verified_blobs = blob_sidecars + .into_iter() + .map(|blob_sidecar| { + let gossip_verified_blob = + GossipVerifiedBlob::new(blob_sidecar.clone(), blob_sidecar.index, &chain); + + match gossip_verified_blob { + Ok(blob) => Ok(Some(blob)), + Err(GossipBlobError::RepeatBlob { proposer, .. }) => { + // Log the error but do not abort publication, we may need to publish the block + // or some of the other blobs if the block & blobs are only partially published + // by the other publisher. + debug!( + log, + "Blob for publication already known"; + "blob_index" => blob_sidecar.index, + "slot" => slot, + "proposer" => proposer, + ); + Ok(None) + } + Err(e) => { + error!( + log, + "Blob for publication is gossip-invalid"; + "blob_index" => blob_sidecar.index, + "slot" => slot, + "error" => ?e, + ); + Err(warp_utils::reject::custom_bad_request(e.to_string())) + } + } + }) + .collect::, Rejection>>()?; + + let gossip_verified_data_columns = data_column_sidecars + .into_iter() + .map(|data_column_sidecar| { + let column_index = data_column_sidecar.index as usize; + let subnet = + DataColumnSubnetId::from_column_index::(column_index, &chain.spec); + let gossip_verified_column = + GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), &chain); + + match gossip_verified_column { + Ok(blob) => Ok(Some(blob)), + Err(GossipDataColumnError::PriorKnown { proposer, .. }) => { + // Log the error but do not abort publication, we may need to publish the block + // or some of the other data columns if the block & data columns are only + // partially published by the other publisher. + debug!( + log, + "Data column for publication already known"; + "column_index" => column_index, + "slot" => slot, + "proposer" => proposer, + ); + Ok(None) + } + Err(e) => { + error!( + log, + "Data column for publication is gossip-invalid"; + "column_index" => column_index, + "slot" => slot, + "error" => ?e, + ); + Err(warp_utils::reject::custom_bad_request(format!("{e:?}"))) + } + } + }) + .collect::, Rejection>>()?; + + let publishable_blobs = gossip_verified_blobs + .iter() + .flatten() + .map(|b| b.clone_blob()) + .collect::>(); + + let publishable_data_columns = gossip_verified_data_columns + .iter() + .flatten() + .map(|b| b.clone_data_column()) + .collect::>(); + + let block_root = block_root.unwrap_or_else(|| { + gossip_verified_block_result.as_ref().map_or_else( + |_| block.canonical_root(), + |verified_block| verified_block.block_root, + ) + }); + let should_publish_block = gossip_verified_block_result.is_ok(); if let BroadcastValidation::Gossip = validation_level { - publish_block( + publish_block_p2p( block.clone(), - blobs_opt.clone(), - data_cols_opt.clone(), + should_publish_block, + publishable_blobs.clone(), + publishable_data_columns.clone(), sender_clone.clone(), log.clone(), seen_timestamp, @@ -223,71 +334,81 @@ pub async fn publish_block Ok(()), - BroadcastValidation::Consensus => publish_block( - block_clone, - blobs_opt, - data_cols_opt, - sender_clone, - log_clone, - seen_timestamp, - ), - BroadcastValidation::ConsensusAndEquivocation => { - check_slashable( - &chain_clone, - &blobs_opt, - block_root, - &block_clone, - &log_clone, - )?; - publish_block( - block_clone, - blobs_opt, - data_cols_opt, - sender_clone, - log_clone, + let publish_fn_completed = Arc::new(AtomicBool::new(false)); + let block_to_publish = block.clone(); + let publish_fn = || { + match validation_level { + BroadcastValidation::Gossip => (), + BroadcastValidation::Consensus => publish_block_p2p( + block_to_publish.clone(), + should_publish_block, + publishable_blobs.clone(), + publishable_data_columns.clone(), + sender_clone.clone(), + log.clone(), seen_timestamp, - ) - } + )?, + BroadcastValidation::ConsensusAndEquivocation => { + check_slashable(&chain, block_root, &block_to_publish, &log)?; + publish_block_p2p( + block_to_publish.clone(), + should_publish_block, + publishable_blobs.clone(), + publishable_data_columns.clone(), + sender_clone.clone(), + log.clone(), + seen_timestamp, + )?; + } + }; + publish_fn_completed.store(true, Ordering::SeqCst); + Ok(()) }; - if let Some(gossip_verified_blobs) = gossip_verified_blobs { - for blob in gossip_verified_blobs { - if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await { - let msg = format!("Invalid blob: {e}"); - return if let BroadcastValidation::Gossip = validation_level { - Err(warp_utils::reject::broadcast_without_import(msg)) - } else { - error!( - log, - "Invalid blob provided to HTTP API"; - "reason" => &msg - ); - Err(warp_utils::reject::custom_bad_request(msg)) - }; - } + for blob in gossip_verified_blobs.into_iter().flatten() { + // Importing the blobs could trigger block import and network publication in the case + // where the block was already seen on gossip. + if let Err(e) = Box::pin(chain.process_gossip_blob(blob, &publish_fn)).await { + let msg = format!("Invalid blob: {e}"); + return if let BroadcastValidation::Gossip = validation_level { + Err(warp_utils::reject::broadcast_without_import(msg)) + } else { + error!( + log, + "Invalid blob provided to HTTP API"; + "reason" => &msg + ); + Err(warp_utils::reject::custom_bad_request(msg)) + }; } } - if let Some(gossip_verified_data_columns) = gossip_verified_data_columns { + if gossip_verified_data_columns + .iter() + .map(Option::is_some) + .count() + > 0 + { let custody_columns_indices = &network_globals.custody_columns; let custody_columns = gossip_verified_data_columns .into_iter() + .flatten() .filter(|data_column| custody_columns_indices.contains(&data_column.index())) .collect(); - if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await { + // Importing the columns could trigger block import and network publication in the case + // where the block was already seen on gossip. + if let Err(e) = + Box::pin(chain.process_gossip_data_columns(custody_columns, publish_fn)).await + { let msg = format!("Invalid data column: {e}"); return if let BroadcastValidation::Gossip = validation_level { Err(warp_utils::reject::broadcast_without_import(msg)) } else { error!( log, - "Invalid blob provided to HTTP API"; + "Invalid data column during block publication"; "reason" => &msg ); Err(warp_utils::reject::custom_bad_request(msg)) @@ -295,23 +416,117 @@ pub async fn publish_block { + match gossip_verified_block_result { + Ok(gossip_verified_block) => { + let import_result = Box::pin(chain.process_block( + block_root, + gossip_verified_block, + NotifyExecutionLayer::Yes, + BlockImportSource::HttpApi, + publish_fn, + )) + .await; + post_block_import_logging_and_response( + import_result, + validation_level, + block, + is_locally_built_block, + seen_timestamp, + &chain, + &log, + ) + .await + } + Err(BlockError::DuplicateFullyImported(root)) => { + if publish_fn_completed.load(Ordering::SeqCst) { + post_block_import_logging_and_response( + Ok(AvailabilityProcessingStatus::Imported(root)), + validation_level, + block, + is_locally_built_block, + seen_timestamp, + &chain, + &log, + ) + .await + } else { + // None of the components provided in this HTTP request were new, so this was an + // entirely redundant duplicate request. Return a status code indicating this, + // which can be overridden based on config. + Ok(warp::reply::with_status( + warp::reply::json(&ErrorMessage { + code: duplicate_status_code.as_u16(), + message: "duplicate block".to_string(), + stacktraces: vec![], + }), + duplicate_status_code, + ) + .into_response()) + } + } + Err(BlockError::DuplicateImportStatusUnknown(root)) => { + debug!( + log, + "Block previously seen"; + "block_root" => ?root, + "slot" => block.slot(), + ); + let import_result = Box::pin(chain.process_block( + block_root, + block.clone(), + NotifyExecutionLayer::Yes, + BlockImportSource::HttpApi, + publish_fn, + )) + .await; + post_block_import_logging_and_response( + import_result, + validation_level, + block, + is_locally_built_block, + seen_timestamp, + &chain, + &log, + ) + .await + } + Err(e) => { + warn!( + log, + "Not publishing block - not gossip verified"; + "slot" => slot, + "error" => %e + ); + Err(warp_utils::reject::custom_bad_request(e.to_string())) + } + } +} + +async fn post_block_import_logging_and_response( + result: Result, + validation_level: BroadcastValidation, + block: Arc>, + is_locally_built_block: bool, + seen_timestamp: Duration, + chain: &Arc>, + log: &Logger, +) -> Result { + match result { + // The `DuplicateFullyImported` case here captures the case where the block finishes + // being imported after gossip verification. It could be that it finished imported as a + // result of the block being imported from gossip, OR it could be that it finished importing + // after processing of a gossip blob. In the latter case we MUST run fork choice to + // re-compute the head. + Ok(AvailabilityProcessingStatus::Imported(root)) + | Err(BlockError::DuplicateFullyImported(root)) => { + let delay = get_block_delay_ms(seen_timestamp, block.message(), &chain.slot_clock); info!( log, "Valid block from HTTP API"; "block_delay" => ?delay, - "root" => format!("{}", root), - "proposer_index" => proposer_index, - "slot" =>slot, + "root" => %root, + "proposer_index" => block.message().proposer_index(), + "slot" => block.slot(), ); // Notify the validator monitor. @@ -330,7 +545,7 @@ pub async fn publish_block &msg + "reason" => ?e, ); Err(warp_utils::reject::custom_bad_request(format!( "Invalid block: {e}" @@ -385,7 +599,7 @@ pub async fn publish_blinded_block( network_globals: Arc>, ) -> Result { let block_root = blinded_block.canonical_root(); - let full_block: ProvenancedBlock> = + let full_block = reconstruct_block(chain.clone(), block_root, blinded_block, log.clone()).await?; publish_block::( Some(block_root), @@ -408,7 +622,7 @@ pub async fn reconstruct_block( block_root: Hash256, block: Arc>, log: Logger, -) -> Result>, Rejection> { +) -> Result>>, Rejection> { let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() { let el = chain.execution_layer.as_ref().ok_or_else(|| { warp_utils::reject::custom_server_error("Missing execution layer".to_string()) @@ -474,14 +688,17 @@ pub async fn reconstruct_block( match full_payload_opt { // A block without a payload is pre-merge and we consider it locally // built. - None => into_full_block_and_blobs(block, None).map(ProvenancedBlock::local), + None => block + .try_into_full_block(None) + .ok_or("Failed to build full block with payload".to_string()) + .map(|full_block| ProvenancedBlock::local(Arc::new(full_block), None)), Some(ProvenancedPayload::Local(full_payload_contents)) => { - into_full_block_and_blobs(block, Some(full_payload_contents)) - .map(ProvenancedBlock::local) + into_full_block_and_blobs::(block, full_payload_contents) + .map(|(block, blobs)| ProvenancedBlock::local(block, blobs)) } Some(ProvenancedPayload::Builder(full_payload_contents)) => { - into_full_block_and_blobs(block, Some(full_payload_contents)) - .map(ProvenancedBlock::builder) + into_full_block_and_blobs::(block, full_payload_contents) + .map(|(block, blobs)| ProvenancedBlock::builder(block, blobs)) } } .map_err(|e| { @@ -540,28 +757,11 @@ fn late_block_logging>( /// Check if any of the blobs or the block are slashable. Returns `BlockError::Slashable` if so. fn check_slashable( chain_clone: &BeaconChain, - blobs_opt: &Option>, block_root: Hash256, block_clone: &SignedBeaconBlock>, log_clone: &Logger, ) -> Result<(), BlockError> { let slashable_cache = chain_clone.observed_slashable.read(); - if let Some(blobs) = blobs_opt.as_ref() { - blobs.iter().try_for_each(|blob| { - if slashable_cache - .is_slashable(blob.slot(), blob.block_proposer_index(), blob.block_root()) - .map_err(|e| BlockError::BeaconChainError(e.into()))? - { - warn!( - log_clone, - "Not publishing equivocating blob"; - "slot" => block_clone.slot() - ); - return Err(BlockError::Slashable); - } - Ok(()) - })?; - }; if slashable_cache .is_slashable( block_clone.slot(), @@ -579,3 +779,38 @@ fn check_slashable( } Ok(()) } + +/// Converting from a `SignedBlindedBeaconBlock` into a full `SignedBlockContents`. +#[allow(clippy::type_complexity)] +pub fn into_full_block_and_blobs( + blinded_block: SignedBlindedBeaconBlock, + maybe_full_payload_contents: FullPayloadContents, +) -> Result<(Arc>, UnverifiedBlobs), String> { + match maybe_full_payload_contents { + // This variant implies a pre-deneb block + FullPayloadContents::Payload(execution_payload) => { + let signed_block = blinded_block + .try_into_full_block(Some(execution_payload)) + .ok_or("Failed to build full block with payload".to_string())?; + Ok((Arc::new(signed_block), None)) + } + // This variant implies a post-deneb block + FullPayloadContents::PayloadAndBlobs(payload_and_blobs) => { + let ExecutionPayloadAndBlobs { + execution_payload, + blobs_bundle, + } = payload_and_blobs; + let signed_block = blinded_block + .try_into_full_block(Some(execution_payload)) + .ok_or("Failed to build full block with payload".to_string())?; + + let BlobsBundle { + commitments: _, + proofs, + blobs, + } = blobs_bundle; + + Ok((Arc::new(signed_block), Some((proofs, blobs)))) + } + } +} diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 4742fa109ff..7b48d64e36f 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -61,7 +61,8 @@ type Mutator = BoxedMutator, MemoryStore>; impl InteractiveTester { pub async fn new(spec: Option, validator_count: usize) -> Self { - Self::new_with_initializer_and_mutator(spec, validator_count, None, None).await + Self::new_with_initializer_and_mutator(spec, validator_count, None, None, Config::default()) + .await } pub async fn new_with_initializer_and_mutator( @@ -69,6 +70,7 @@ impl InteractiveTester { validator_count: usize, initializer: Option>, mutator: Option>, + config: Config, ) -> Self { let mut harness_builder = BeaconChainHarness::builder(E::default()) .spec_or_default(spec.map(Arc::new)) @@ -99,8 +101,9 @@ impl InteractiveTester { listening_socket, network_rx, .. - } = create_api_server( + } = create_api_server_with_config( harness.chain.clone(), + config, &harness.runtime, harness.logger().clone(), ) @@ -131,6 +134,15 @@ pub async fn create_api_server( chain: Arc>, test_runtime: &TestRuntime, log: Logger, +) -> ApiServer> { + create_api_server_with_config(chain, Config::default(), test_runtime, log).await +} + +pub async fn create_api_server_with_config( + chain: Arc>, + http_config: Config, + test_runtime: &TestRuntime, + log: Logger, ) -> ApiServer> { // Use port 0 to allocate a new unused port. let port = 0; @@ -220,12 +232,14 @@ pub async fn create_api_server( .unwrap(); let ctx = Arc::new(Context { + // Override several config fields with defaults. If these need to be tweaked in future + // we could remove these overrides. config: Config { enabled: true, listen_port: port, data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), enable_light_client_server: true, - ..Config::default() + ..http_config }, chain: Some(chain), network_senders: Some(network_senders), diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index 59cdbb1c99e..f55983ec66a 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -1,13 +1,16 @@ +use beacon_chain::blob_verification::GossipVerifiedBlob; use beacon_chain::{ test_utils::{AttestationStrategy, BlockStrategy}, - GossipVerifiedBlock, IntoGossipVerifiedBlockContents, + GossipVerifiedBlock, IntoGossipVerifiedBlock, }; use eth2::reqwest::StatusCode; use eth2::types::{BroadcastValidation, PublishBlockRequest}; use http_api::test_utils::InteractiveTester; -use http_api::{publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock}; +use http_api::{publish_blinded_block, publish_block, reconstruct_block, Config, ProvenancedBlock}; use std::sync::Arc; -use types::{Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MainnetEthSpec, Slot}; +use types::{ + BlobSidecar, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MainnetEthSpec, Slot, +}; use warp::Rejection; use warp_utils::reject::CustomBadRequest; @@ -81,7 +84,7 @@ pub async fn gossip_invalid() { /* mandated by Beacon API spec */ assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string()); + assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()); } /// This test checks that a block that is valid from a gossip perspective is accepted when using `broadcast_validation=gossip`. @@ -266,7 +269,7 @@ pub async fn consensus_invalid() { /* mandated by Beacon API spec */ assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string()); + assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()); } /// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus`. @@ -360,10 +363,9 @@ pub async fn consensus_partial_pass_only_consensus() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_contents_b = PublishBlockRequest::new(block_b, blobs_b) - .into_gossip_verified_block(&tester.harness.chain); - assert!(gossip_block_contents_b.is_ok()); - let gossip_block_a = GossipVerifiedBlock::new(block_a.clone().into(), &tester.harness.chain); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); + assert!(gossip_block_b.is_ok()); + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); assert!(gossip_block_a.is_err()); /* submit `block_b` which should induce equivocation */ @@ -372,7 +374,7 @@ pub async fn consensus_partial_pass_only_consensus() { let publication_result = publish_block( None, - ProvenancedBlock::local(gossip_block_contents_b.unwrap()), + ProvenancedBlock::local(gossip_block_b.unwrap(), blobs_b), tester.harness.chain.clone(), &channel.0, test_logger, @@ -382,7 +384,7 @@ pub async fn consensus_partial_pass_only_consensus() { ) .await; - assert!(publication_result.is_ok()); + assert!(publication_result.is_ok(), "{publication_result:?}"); assert!(tester .harness .chain @@ -481,7 +483,7 @@ pub async fn equivocation_invalid() { /* mandated by Beacon API spec */ assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string()); + assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()); } /// This test checks that a block that is valid from both a gossip and consensus perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. @@ -555,10 +557,7 @@ pub async fn equivocation_consensus_early_equivocation() { let error_response: eth2::Error = response.err().unwrap(); assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error( - error_response, - "BAD_REQUEST: BlockError(Slashable)".to_string(), - ); + assert_server_message_error(error_response, "BAD_REQUEST: Slashable".to_string()); } /// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. @@ -642,7 +641,7 @@ pub async fn equivocation_consensus_late_equivocation() { let slot_b = slot_a + 1; let state_a = tester.harness.get_current_state(); - let ((block_a, blobs_a), mut state_after_a) = + let ((block_a, _blobs_a), mut state_after_a) = tester.harness.make_block(state_a.clone(), slot_b).await; let ((block_b, blobs_b), mut state_after_b) = tester.harness.make_block(state_a, slot_b).await; @@ -657,19 +656,18 @@ pub async fn equivocation_consensus_late_equivocation() { ); assert_ne!(block_a.state_root(), block_b.state_root()); - let gossip_block_contents_b = PublishBlockRequest::new(block_b, blobs_b) - .into_gossip_verified_block(&tester.harness.chain); - assert!(gossip_block_contents_b.is_ok()); - let gossip_block_contents_a = PublishBlockRequest::new(block_a, blobs_a) - .into_gossip_verified_block(&tester.harness.chain); - assert!(gossip_block_contents_a.is_err()); + let gossip_block_b = block_b.into_gossip_verified_block(&tester.harness.chain); + assert!(gossip_block_b.is_ok()); + + let gossip_block_a = block_a.into_gossip_verified_block(&tester.harness.chain); + assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); let network_globals = tester.ctx.network_globals.clone().unwrap(); let publication_result = publish_block( None, - ProvenancedBlock::local(gossip_block_contents_b.unwrap()), + ProvenancedBlock::local(gossip_block_b.unwrap(), blobs_b), tester.harness.chain, &channel.0, test_logger, @@ -686,8 +684,8 @@ pub async fn equivocation_consensus_late_equivocation() { assert!(publication_error.find::().is_some()); assert_eq!( - *publication_error.find::().unwrap().0, - "proposal for this slot and proposer has already been seen".to_string() + publication_error.find::().unwrap().0, + "proposal for this slot and proposer has already been seen" ); } @@ -783,7 +781,7 @@ pub async fn blinded_gossip_invalid() { /* mandated by Beacon API spec */ assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string()); + assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()); } /// This test checks that a block that is valid from a gossip perspective is accepted when using `broadcast_validation=gossip`. @@ -961,7 +959,7 @@ pub async fn blinded_consensus_invalid() { /* mandated by Beacon API spec */ assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string()); + assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()); } /// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus`. @@ -1099,7 +1097,7 @@ pub async fn blinded_equivocation_invalid() { /* mandated by Beacon API spec */ assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error(error_response, "BAD_REQUEST: BlockError(NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 })".to_string()); + assert_server_message_error(error_response, "BAD_REQUEST: NotFinalizedDescendant { block_parent_root: 0x0000000000000000000000000000000000000000000000000000000000000000 }".to_string()); } /// This test checks that a block that is valid from both a gossip and consensus perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. @@ -1169,10 +1167,7 @@ pub async fn blinded_equivocation_consensus_early_equivocation() { let error_response: eth2::Error = response.err().unwrap(); assert_eq!(error_response.status(), Some(StatusCode::BAD_REQUEST)); - assert_server_message_error( - error_response, - "BAD_REQUEST: BlockError(Slashable)".to_string(), - ); + assert_server_message_error(error_response, "BAD_REQUEST: Slashable".to_string()); } /// This test checks that a block that is only valid from a gossip perspective is rejected when using `broadcast_validation=consensus_and_equivocation`. @@ -1295,19 +1290,17 @@ pub async fn blinded_equivocation_consensus_late_equivocation() { .unwrap(); let inner_block_a = match unblinded_block_a { - ProvenancedBlock::Local(a, _) => a, - ProvenancedBlock::Builder(a, _) => a, + ProvenancedBlock::Local(a, _, _) => a, + ProvenancedBlock::Builder(a, _, _) => a, }; let inner_block_b = match unblinded_block_b { - ProvenancedBlock::Local(b, _) => b, - ProvenancedBlock::Builder(b, _) => b, + ProvenancedBlock::Local(b, _, _) => b, + ProvenancedBlock::Builder(b, _, _) => b, }; - let gossip_block_b = - GossipVerifiedBlock::new(inner_block_b.clone().deconstruct().0, &tester.harness.chain); + let gossip_block_b = GossipVerifiedBlock::new(inner_block_b, &tester.harness.chain); assert!(gossip_block_b.is_ok()); - let gossip_block_a = - GossipVerifiedBlock::new(inner_block_a.clone().deconstruct().0, &tester.harness.chain); + let gossip_block_a = GossipVerifiedBlock::new(inner_block_a, &tester.harness.chain); assert!(gossip_block_a.is_err()); let channel = tokio::sync::mpsc::unbounded_channel(); @@ -1374,6 +1367,438 @@ pub async fn blinded_equivocation_full_pass() { .block_is_known_to_fork_choice(&block.canonical_root())); } +/// This test checks that an HTTP POST request with the block & blobs succeeds with a 200 response +/// even if the block has already been seen on gossip without any blobs. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn block_seen_on_gossip_without_blobs() { + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let tester = InteractiveTester::::new(Some(spec), validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await; + let blobs = blobs.expect("should have some blobs"); + assert_ne!(blobs.0.len(), 0); + + // Simulate the block being seen on gossip. + block + .clone() + .into_gossip_verified_block(&tester.harness.chain) + .unwrap(); + + // It should not yet be added to fork choice because blobs have not been seen. + assert!(!tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); + + // Post the block *and* blobs to the HTTP API. + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2( + &PublishBlockRequest::new(block.clone(), Some(blobs)), + validation_level, + ) + .await; + + // This should result in the block being fully imported. + response.unwrap(); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that an HTTP POST request with the block & blobs succeeds with a 200 response +/// even if the block has already been seen on gossip without all blobs. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn block_seen_on_gossip_with_some_blobs() { + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let tester = InteractiveTester::::new(Some(spec), validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await; + let blobs = blobs.expect("should have some blobs"); + assert!( + blobs.0.len() >= 2, + "need at least 2 blobs for partial reveal" + ); + + let partial_kzg_proofs = vec![blobs.0.get(0).unwrap().clone()]; + let partial_blobs = vec![blobs.1.get(0).unwrap().clone()]; + + // Simulate the block being seen on gossip. + block + .clone() + .into_gossip_verified_block(&tester.harness.chain) + .unwrap(); + + // Simulate some of the blobs being seen on gossip. + for (i, (kzg_proof, blob)) in partial_kzg_proofs + .into_iter() + .zip(partial_blobs) + .enumerate() + { + let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap()); + let gossip_blob = + GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); + tester + .harness + .chain + .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .await + .unwrap(); + } + + // It should not yet be added to fork choice because all blobs have not been seen. + assert!(!tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); + + // Post the block *and* all blobs to the HTTP API. + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2( + &PublishBlockRequest::new(block.clone(), Some(blobs)), + validation_level, + ) + .await; + + // This should result in the block being fully imported. + response.unwrap(); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that an HTTP POST request with the block & blobs succeeds with a 200 response +/// even if the blobs have already been seen on gossip. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blobs_seen_on_gossip_without_block() { + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let tester = InteractiveTester::::new(Some(spec), validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await; + let (kzg_proofs, blobs) = blobs.expect("should have some blobs"); + + // Simulate the blobs being seen on gossip. + for (i, (kzg_proof, blob)) in kzg_proofs + .clone() + .into_iter() + .zip(blobs.clone()) + .enumerate() + { + let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap()); + let gossip_blob = + GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); + tester + .harness + .chain + .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .await + .unwrap(); + } + + // It should not yet be added to fork choice because the block has not been seen. + assert!(!tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); + + // Post the block *and* all blobs to the HTTP API. + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2( + &PublishBlockRequest::new(block.clone(), Some((kzg_proofs, blobs))), + validation_level, + ) + .await; + + // This should result in the block being fully imported. + response.unwrap(); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +/// This test checks that an HTTP POST request with the block succeeds with a 200 response +/// if just the blobs have already been seen on gossip. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn blobs_seen_on_gossip_without_block_and_no_http_blobs() { + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let tester = InteractiveTester::::new(Some(spec), validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await; + let (kzg_proofs, blobs) = blobs.expect("should have some blobs"); + assert!(!blobs.is_empty()); + + // Simulate the blobs being seen on gossip. + for (i, (kzg_proof, blob)) in kzg_proofs + .clone() + .into_iter() + .zip(blobs.clone()) + .enumerate() + { + let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap()); + let gossip_blob = + GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); + tester + .harness + .chain + .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .await + .unwrap(); + } + + // It should not yet be added to fork choice because the block has not been seen. + assert!(!tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); + + // Post just the block to the HTTP API (blob lists are empty). + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2( + &PublishBlockRequest::new( + block.clone(), + Some((Default::default(), Default::default())), + ), + validation_level, + ) + .await; + + // This should result in the block being fully imported. + response.unwrap(); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn slashable_blobs_seen_on_gossip_cause_failure() { + let validation_level: Option = + Some(BroadcastValidation::ConsensusAndEquivocation); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let tester = InteractiveTester::::new(Some(spec), validator_count).await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let ((block_a, blobs_a), _) = tester.harness.make_block(state_a.clone(), slot_b).await; + let ((block_b, blobs_b), _) = tester.harness.make_block(state_a, slot_b).await; + let (kzg_proofs_a, blobs_a) = blobs_a.expect("should have some blobs"); + let (kzg_proofs_b, blobs_b) = blobs_b.expect("should have some blobs"); + + // Simulate the blobs of block B being seen on gossip. + for (i, (kzg_proof, blob)) in kzg_proofs_b.into_iter().zip(blobs_b).enumerate() { + let sidecar = Arc::new(BlobSidecar::new(i, blob, &block_b, kzg_proof).unwrap()); + let gossip_blob = + GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); + tester + .harness + .chain + .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .await + .unwrap(); + } + + // It should not yet be added to fork choice because block B has not been seen. + assert!(!tester + .harness + .chain + .block_is_known_to_fork_choice(&block_b.canonical_root())); + + // Post block A *and* all its blobs to the HTTP API. + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2( + &PublishBlockRequest::new(block_a.clone(), Some((kzg_proofs_a, blobs_a))), + validation_level, + ) + .await; + + // This should not result in block A being fully imported. + response.unwrap_err(); + assert!(!tester + .harness + .chain + .block_is_known_to_fork_choice(&block_a.canonical_root())); +} + +/// This test checks that an HTTP POST request with a duplicate block & blobs results in the +/// `duplicate_status_code` being returned. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn duplicate_block_status_code() { + let validation_level: Option = Some(BroadcastValidation::Gossip); + + // Validator count needs to be at least 32 or proposer boost gets set to 0 when computing + // `validator_count // 32`. + let validator_count = 64; + let num_initial: u64 = 31; + let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let duplicate_block_status_code = StatusCode::IM_A_TEAPOT; + let tester = InteractiveTester::::new_with_initializer_and_mutator( + Some(spec), + validator_count, + None, + None, + Config { + duplicate_block_status_code, + ..Config::default() + }, + ) + .await; + + // Create some chain depth. + tester.harness.advance_slot(); + tester + .harness + .extend_chain( + num_initial as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + tester.harness.advance_slot(); + + let slot_a = Slot::new(num_initial); + let slot_b = slot_a + 1; + + let state_a = tester.harness.get_current_state(); + let ((block, blobs), _) = tester.harness.make_block(state_a, slot_b).await; + let (kzg_proofs, blobs) = blobs.expect("should have some blobs"); + + // Post the block blobs to the HTTP API once. + let block_request = PublishBlockRequest::new(block.clone(), Some((kzg_proofs, blobs))); + let response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block_request, validation_level) + .await; + + // This should result in the block being fully imported. + response.unwrap(); + assert!(tester + .harness + .chain + .block_is_known_to_fork_choice(&block.canonical_root())); + + // Post again. + let duplicate_response: Result<(), eth2::Error> = tester + .client + .post_beacon_blocks_v2(&block_request, validation_level) + .await; + let err = duplicate_response.unwrap_err(); + assert_eq!(err.status().unwrap(), duplicate_block_status_code); +} + fn assert_server_message_error(error_response: eth2::Error, expected_message: String) { let eth2::Error::ServerMessage(err) = error_response else { panic!("Not a eth2::Error::ServerMessage"); diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index b5b3edf892e..8cb6053e9ff 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -386,6 +386,7 @@ async fn bls_to_execution_changes_update_all_around_capella_fork() { .genesis_state_ephemeral_store(genesis_state) })), None, + Default::default(), ) .await; let harness = &tester.harness; diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 5034492e250..c3ed3347821 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -72,6 +72,7 @@ async fn state_by_root_pruned_from_fork_choice() { }) })), None, + Default::default(), ) .await; @@ -427,6 +428,7 @@ pub async fn proposer_boost_re_org_test( DisallowedReOrgOffsets::new::(disallowed_offsets).unwrap(), ) })), + Default::default(), ) .await; let harness = &tester.harness; diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index ddcd74d20b5..005536bcf25 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -936,7 +936,10 @@ impl NetworkBeaconProcessor { let blob_slot = verified_blob.slot(); let blob_index = verified_blob.id().index; - let result = self.chain.process_gossip_blob(verified_blob).await; + let result = self + .chain + .process_gossip_blob(verified_blob, || Ok(())) + .await; match &result { Ok(AvailabilityProcessingStatus::Imported(block_root)) => { @@ -963,7 +966,7 @@ impl NetworkBeaconProcessor { "block_root" => %block_root, ); } - Err(BlockError::BlockIsAlreadyKnown(_)) => { + Err(BlockError::DuplicateFullyImported(_)) => { debug!( self.log, "Ignoring gossip blob already imported"; @@ -1013,7 +1016,7 @@ impl NetworkBeaconProcessor { match self .chain - .process_gossip_data_columns(vec![verified_data_column]) + .process_gossip_data_columns(vec![verified_data_column], || Ok(())) .await { Ok((availability, data_columns_to_publish)) => { @@ -1050,7 +1053,7 @@ impl NetworkBeaconProcessor { } } } - Err(BlockError::BlockIsAlreadyKnown(_)) => { + Err(BlockError::DuplicateFullyImported(_)) => { debug!( self.log, "Ignoring gossip column already imported"; @@ -1242,7 +1245,10 @@ impl NetworkBeaconProcessor { self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); return None; } - Err(BlockError::BlockIsAlreadyKnown(_)) => { + Err( + BlockError::DuplicateFullyImported(_) + | BlockError::DuplicateImportStatusUnknown(..), + ) => { debug!( self.log, "Gossip block is already known"; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 50c7ee05a17..dcad6160b34 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -294,7 +294,7 @@ impl NetworkBeaconProcessor { "slot" => %slot, ); } - Err(BlockError::BlockIsAlreadyKnown(_)) => { + Err(BlockError::DuplicateFullyImported(_)) => { debug!( self.log, "Blobs have already been imported"; @@ -355,7 +355,7 @@ impl NetworkBeaconProcessor { } } } - Err(BlockError::BlockIsAlreadyKnown(_)) => { + Err(BlockError::DuplicateFullyImported(_)) => { debug!( self.log, "Custody columns have already been imported"; @@ -715,7 +715,8 @@ impl NetworkBeaconProcessor { peer_action: Some(PeerAction::LowToleranceError), }) } - BlockError::BlockIsAlreadyKnown(_) => { + BlockError::DuplicateFullyImported(_) + | BlockError::DuplicateImportStatusUnknown(..) => { // This can happen for many reasons. Head sync's can download multiples and parent // lookups can download blocks before range sync Ok(()) diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 9abcd263de6..a9dbf11fd06 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -517,7 +517,7 @@ impl BlockLookups { let action = match result { BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockProcessingResult::Err(BlockError::BlockIsAlreadyKnown(_)) => { + | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) => { // Successfully imported request_state.on_processing_success()?; Action::Continue @@ -541,6 +541,16 @@ impl BlockLookups { Action::Retry } } + BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => { + // This is unreachable because RPC blocks do not undergo gossip verification, and + // this error can *only* come from gossip verification. + error!( + self.log, + "Single block lookup hit unreachable condition"; + "block_root" => ?block_root + ); + Action::Drop + } BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 6e6c9a5cdfe..c0a766137bf 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1471,7 +1471,7 @@ fn test_parent_lookup_happy_path() { // Processing succeeds, now the rest of the chain should be sent for processing. rig.parent_block_processed( block_root, - BlockError::BlockIsAlreadyKnown(block_root).into(), + BlockError::DuplicateFullyImported(block_root).into(), ); rig.expect_parent_chain_process(); rig.parent_chain_processed_success(block_root, &[]); @@ -1839,7 +1839,7 @@ fn test_same_chain_race_condition() { rig.log(&format!("Block {i} was removed and is already known")); rig.parent_block_processed( chain_hash, - BlockError::BlockIsAlreadyKnown(block.canonical_root()).into(), + BlockError::DuplicateFullyImported(block.canonical_root()).into(), ) } else { rig.log(&format!("Block {i} ParentUnknown")); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 07d04b3fb20..5b7003e5e85 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -629,8 +629,8 @@ impl SyncNetworkContext { // cache nor in the request state of this lookup. Therefore, the block must either: (1) not // be downloaded yet or (2) the block is already imported into the fork-choice. // In case (1) the lookup must either successfully download the block or get dropped. - // In case (2) the block will be downloaded, processed, reach `BlockIsAlreadyKnown` and - // get dropped as completed. + // In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported` + // and get dropped as completed. return Ok(LookupRequestResult::Pending("waiting for block download")); }; let expected_blobs = block.num_expected_blobs(); diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 3925d2deda8..e1550fdee24 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -1883,42 +1883,6 @@ impl PublishBlockRequest { } } -/// Converting from a `SignedBlindedBeaconBlock` into a full `SignedBlockContents`. -pub fn into_full_block_and_blobs( - blinded_block: SignedBlindedBeaconBlock, - maybe_full_payload_contents: Option>, -) -> Result, String> { - match maybe_full_payload_contents { - None => { - let signed_block = blinded_block - .try_into_full_block(None) - .ok_or("Failed to build full block with payload".to_string())?; - Ok(PublishBlockRequest::new(Arc::new(signed_block), None)) - } - // This variant implies a pre-deneb block - Some(FullPayloadContents::Payload(execution_payload)) => { - let signed_block = blinded_block - .try_into_full_block(Some(execution_payload)) - .ok_or("Failed to build full block with payload".to_string())?; - Ok(PublishBlockRequest::new(Arc::new(signed_block), None)) - } - // This variant implies a post-deneb block - Some(FullPayloadContents::PayloadAndBlobs(payload_and_blobs)) => { - let signed_block = blinded_block - .try_into_full_block(Some(payload_and_blobs.execution_payload)) - .ok_or("Failed to build full block with payload".to_string())?; - - Ok(PublishBlockRequest::new( - Arc::new(signed_block), - Some(( - payload_and_blobs.blobs_bundle.proofs, - payload_and_blobs.blobs_bundle.blobs, - )), - )) - } - } -} - impl TryFrom>> for PublishBlockRequest { type Error = &'static str; fn try_from(block: Arc>) -> Result { diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 90c05aea1f7..57251e319a4 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -161,6 +161,7 @@ pub enum DataColumnSidecarError { DataColumnIndexOutOfBounds, KzgCommitmentInclusionProofOutOfBounds, KzgError(KzgError), + KzgNotInitialized, MissingBlobSidecars, PreDeneb, SszError(SszError), diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 33ae132e8a2..8d933a6fcd5 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -505,8 +505,8 @@ impl Tester { } Err(_) => GossipVerifiedBlob::__assumed_valid(blob_sidecar), }; - let result = - self.block_on_dangerous(self.harness.chain.process_gossip_blob(blob))?; + let result = self + .block_on_dangerous(self.harness.chain.process_gossip_blob(blob, || Ok(())))?; if valid { assert!(result.is_ok()); }