diff --git a/beacon_node/beacon_chain/benches/benches.rs b/beacon_node/beacon_chain/benches/benches.rs index b2f17062dce..c09af00be68 100644 --- a/beacon_node/beacon_chain/benches/benches.rs +++ b/beacon_node/beacon_chain/benches/benches.rs @@ -37,12 +37,15 @@ fn all_benches(c: &mut Criterion) { let kzg = get_kzg(&spec); for blob_count in [1, 2, 3, 6] { - let kzg = kzg.clone(); - let (signed_block, blob_sidecars) = create_test_block_and_blobs::(blob_count, &spec); + let (signed_block, blobs) = create_test_block_and_blobs::(blob_count, &spec); - let column_sidecars = - blobs_to_data_column_sidecars(&blob_sidecars, &signed_block, &kzg.clone(), &spec) - .unwrap(); + let column_sidecars = blobs_to_data_column_sidecars( + &blobs.iter().collect::>(), + &signed_block, + &kzg, + &spec, + ) + .unwrap(); let spec = spec.clone(); diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index f8dfbc55155..9289356507e 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -89,7 +89,7 @@ use kzg::Kzg; use operation_pool::{ CompactAttestationRef, OperationPool, PersistedOperationPool, ReceivedPreCapella, }; -use parking_lot::{Mutex, RwLock}; +use parking_lot::{Mutex, RwLock, RwLockWriteGuard}; use proto_array::{DoNotReOrg, ProposerHeadError}; use safe_arith::SafeArith; use slasher::Slasher; @@ -121,6 +121,7 @@ use store::{ DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; +use tokio::sync::mpsc::Receiver; use tokio_stream::Stream; use tree_hash::TreeHash; use types::blob_sidecar::FixedBlobSidecarList; @@ -2976,7 +2977,6 @@ impl BeaconChain { pub async fn process_gossip_blob( self: &Arc, blob: GossipVerifiedBlob, - publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { let block_root = blob.block_root(); @@ -2995,17 +2995,9 @@ impl BeaconChain { return Err(BlockError::BlobNotRequired(blob.slot())); } - if let Some(event_handler) = self.event_handler.as_ref() { - if event_handler.has_blob_sidecar_subscribers() { - event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar( - blob.as_blob(), - ))); - } - } + self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob())); - let r = self - .check_gossip_blob_availability_and_import(blob, publish_fn) - .await; + let r = self.check_gossip_blob_availability_and_import(blob).await; self.remove_notified(&block_root, r) } @@ -3083,20 +3075,63 @@ impl BeaconChain { } } + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + + let r = self + .check_rpc_blob_availability_and_import(slot, block_root, blobs) + .await; + self.remove_notified(&block_root, r) + } + + /// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`. + /// + /// `data_column_recv`: An optional receiver for `DataColumnSidecarList`. + /// If PeerDAS is enabled, this receiver will be provided and used to send + /// the `DataColumnSidecar`s once they have been successfully computed. + pub async fn process_engine_blobs( + self: &Arc, + slot: Slot, + block_root: Hash256, + blobs: FixedBlobSidecarList, + data_column_recv: Option>>, + ) -> Result { + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its blobs again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + + let r = self + .check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv) + .await; + self.remove_notified(&block_root, r) + } + + fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc, block_root: &Hash256, blobs_iter: I) + where + I: Iterator>, + { if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_blob_sidecar_subscribers() { - for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) { + let imported_blobs = self + .data_availability_checker + .cached_blob_indexes(block_root) + .unwrap_or_default(); + let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index)); + + for blob in new_blobs { event_handler.register(EventKind::BlobSidecar( SseBlobSidecar::from_blob_sidecar(blob), )); } } } - - let r = self - .check_rpc_blob_availability_and_import(slot, block_root, blobs) - .await; - self.remove_notified(&block_root, r) } /// Cache the columns in the processing cache, process it, then evict it from the cache if it was @@ -3186,7 +3221,7 @@ impl BeaconChain { }; let r = self - .process_availability(slot, availability, || Ok(())) + .process_availability(slot, availability, None, || Ok(())) .await; self.remove_notified(&block_root, r) .map(|availability_processing_status| { @@ -3314,7 +3349,7 @@ impl BeaconChain { match executed_block { ExecutedBlock::Available(block) => { - self.import_available_block(Box::new(block)).await + self.import_available_block(Box::new(block), None).await } ExecutedBlock::AvailabilityPending(block) => { self.check_block_availability_and_import(block).await @@ -3446,7 +3481,7 @@ impl BeaconChain { let availability = self .data_availability_checker .put_pending_executed_block(block)?; - self.process_availability(slot, availability, || Ok(())) + self.process_availability(slot, availability, None, || Ok(())) .await } @@ -3455,7 +3490,6 @@ impl BeaconChain { async fn check_gossip_blob_availability_and_import( self: &Arc, blob: GossipVerifiedBlob, - publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { let slot = blob.slot(); if let Some(slasher) = self.slasher.as_ref() { @@ -3463,7 +3497,7 @@ impl BeaconChain { } let availability = self.data_availability_checker.put_gossip_blob(blob)?; - self.process_availability(slot, availability, publish_fn) + self.process_availability(slot, availability, None, || Ok(())) .await } @@ -3482,16 +3516,41 @@ impl BeaconChain { } } - let availability = self.data_availability_checker.put_gossip_data_columns( - slot, - block_root, - data_columns, - )?; + let availability = self + .data_availability_checker + .put_gossip_data_columns(block_root, data_columns)?; - self.process_availability(slot, availability, publish_fn) + self.process_availability(slot, availability, None, publish_fn) .await } + fn check_blobs_for_slashability( + self: &Arc, + block_root: Hash256, + blobs: &FixedBlobSidecarList, + ) -> Result<(), BlockError> { + let mut slashable_cache = self.observed_slashable.write(); + for header in blobs + .iter() + .filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone())) + .unique() + { + if verify_header_signature::(self, &header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header); + } + } + } + Ok(()) + } + /// Checks if the provided blobs can make any cached blocks available, and imports immediately /// if so, otherwise caches the blob in the data availability checker. async fn check_rpc_blob_availability_and_import( @@ -3500,35 +3559,28 @@ impl BeaconChain { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result { - // Need to scope this to ensure the lock is dropped before calling `process_availability` - // Even an explicit drop is not enough to convince the borrow checker. - { - let mut slashable_cache = self.observed_slashable.write(); - for header in blobs - .iter() - .filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone())) - .unique() - { - if verify_header_signature::(self, &header).is_ok() { - slashable_cache - .observe_slashable( - header.message.slot, - header.message.proposer_index, - block_root, - ) - .map_err(|e| BlockError::BeaconChainError(e.into()))?; - if let Some(slasher) = self.slasher.as_ref() { - slasher.accept_block_header(header); - } - } - } - } - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + self.check_blobs_for_slashability(block_root, &blobs)?; let availability = self .data_availability_checker - .put_rpc_blobs(block_root, epoch, blobs)?; + .put_rpc_blobs(block_root, blobs)?; - self.process_availability(slot, availability, || Ok(())) + self.process_availability(slot, availability, None, || Ok(())) + .await + } + + async fn check_engine_blob_availability_and_import( + self: &Arc, + slot: Slot, + block_root: Hash256, + blobs: FixedBlobSidecarList, + data_column_recv: Option>>, + ) -> Result { + self.check_blobs_for_slashability(block_root, &blobs)?; + let availability = self + .data_availability_checker + .put_engine_blobs(block_root, blobs)?; + + self.process_availability(slot, availability, data_column_recv, || Ok(())) .await } @@ -3564,13 +3616,11 @@ impl BeaconChain { // This slot value is purely informative for the consumers of // `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot. - let availability = self.data_availability_checker.put_rpc_custody_columns( - block_root, - slot.epoch(T::EthSpec::slots_per_epoch()), - custody_columns, - )?; + let availability = self + .data_availability_checker + .put_rpc_custody_columns(block_root, custody_columns)?; - self.process_availability(slot, availability, || Ok(())) + self.process_availability(slot, availability, None, || Ok(())) .await } @@ -3582,13 +3632,14 @@ impl BeaconChain { self: &Arc, slot: Slot, availability: Availability, + recv: Option>>, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { Availability::Available(block) => { publish_fn()?; // Block is fully available, import into fork choice - self.import_available_block(block).await + self.import_available_block(block, recv).await } Availability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), @@ -3599,6 +3650,7 @@ impl BeaconChain { pub async fn import_available_block( self: &Arc, block: Box>, + data_column_recv: Option>>, ) -> Result { let AvailableExecutedBlock { block, @@ -3640,6 +3692,7 @@ impl BeaconChain { parent_block, parent_eth1_finalization_data, consensus_context, + data_column_recv, ) }, "payload_verification_handle", @@ -3678,6 +3731,7 @@ impl BeaconChain { parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, mut consensus_context: ConsensusContext, + data_column_recv: Option>>, ) -> Result { // ----------------------------- BLOCK NOT YET ATTESTABLE ---------------------------------- // Everything in this initial section is on the hot path between processing the block and @@ -3823,7 +3877,6 @@ impl BeaconChain { // state if we returned early without committing. In other words, an error here would // corrupt the node's database permanently. // ----------------------------------------------------------------------------------------- - self.import_block_update_shuffling_cache(block_root, &mut state); self.import_block_observe_attestations( block, @@ -3840,15 +3893,53 @@ impl BeaconChain { ); self.import_block_update_slasher(block, &state, &mut consensus_context); - let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - // Store the block and its state, and execute the confirmation batch for the intermediate // states, which will delete their temporary flags. // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 let (_, signed_block, blobs, data_columns) = signed_block.deconstruct(); + // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non + // custody columns: https://github.com/sigp/lighthouse/issues/6465 + let custody_columns_count = self.data_availability_checker.get_sampling_column_count(); + // if block is made available via blobs, dropped the data columns. + let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count); + + let data_columns = match (data_columns, data_column_recv) { + // If the block was made available via custody columns received from gossip / rpc, use them + // since we already have them. + (Some(columns), _) => Some(columns), + // Otherwise, it means blobs were likely available via fetching from EL, in this case we + // wait for the data columns to be computed (blocking). + (None, Some(mut data_column_recv)) => { + let _column_recv_timer = + metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT); + // Unable to receive data columns from sender, sender is either dropped or + // failed to compute data columns from blobs. We restore fork choice here and + // return to avoid inconsistency in database. + if let Some(columns) = data_column_recv.blocking_recv() { + Some(columns) + } else { + let err_msg = "Did not receive data columns from sender"; + error!( + self.log, + "Failed to store data columns into the database"; + "msg" => "Restoring fork choice from disk", + "error" => err_msg, + ); + return Err(self + .handle_import_block_db_write_error(fork_choice) + .err() + .unwrap_or(BlockError::InternalError(err_msg.to_string()))); + } + } + // No data columns present and compute data columns task was not spawned. + // Could either be no blobs in the block or before PeerDAS activation. + (None, None) => None, + }; + let block = signed_block.message(); + let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); ops.extend( confirmed_state_roots .into_iter() @@ -3890,33 +3981,10 @@ impl BeaconChain { "msg" => "Restoring fork choice from disk", "error" => ?e, ); - - // Clear the early attester cache to prevent attestations which we would later be unable - // to verify due to the failure. - self.early_attester_cache.clear(); - - // Since the write failed, try to revert the canonical head back to what was stored - // in the database. This attempts to prevent inconsistency between the database and - // fork choice. - if let Err(e) = self.canonical_head.restore_from_store( - fork_choice, - ResetPayloadStatuses::always_reset_conditionally( - self.config.always_reset_payload_statuses, - ), - &self.store, - &self.spec, - &self.log, - ) { - crit!( - self.log, - "No stored fork choice found to restore from"; - "error" => ?e, - "warning" => "The database is likely corrupt now, consider --purge-db" - ); - return Err(BlockError::BeaconChainError(e)); - } - - return Err(e.into()); + return Err(self + .handle_import_block_db_write_error(fork_choice) + .err() + .unwrap_or(e.into())); } drop(txn_lock); @@ -3984,6 +4052,41 @@ impl BeaconChain { Ok(block_root) } + fn handle_import_block_db_write_error( + &self, + // We don't actually need this value, however it's always present when we call this function + // and it needs to be dropped to prevent a dead-lock. Requiring it to be passed here is + // defensive programming. + fork_choice_write_lock: RwLockWriteGuard>, + ) -> Result<(), BlockError> { + // Clear the early attester cache to prevent attestations which we would later be unable + // to verify due to the failure. + self.early_attester_cache.clear(); + + // Since the write failed, try to revert the canonical head back to what was stored + // in the database. This attempts to prevent inconsistency between the database and + // fork choice. + if let Err(e) = self.canonical_head.restore_from_store( + fork_choice_write_lock, + ResetPayloadStatuses::always_reset_conditionally( + self.config.always_reset_payload_statuses, + ), + &self.store, + &self.spec, + &self.log, + ) { + crit!( + self.log, + "No stored fork choice found to restore from"; + "error" => ?e, + "warning" => "The database is likely corrupt now, consider --purge-db" + ); + Err(BlockError::BeaconChainError(e)) + } else { + Ok(()) + } + } + /// Check block's consistentency with any configured weak subjectivity checkpoint. fn check_block_against_weak_subjectivity_checkpoint( &self, diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 743748a76d9..6c87deb8260 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -1,5 +1,6 @@ use derivative::Derivative; use slot_clock::SlotClock; +use std::marker::PhantomData; use std::sync::Arc; use crate::beacon_chain::{BeaconChain, BeaconChainTypes}; @@ -8,11 +9,11 @@ use crate::block_verification::{ BlockSlashInfo, }; use crate::kzg_utils::{validate_blob, validate_blobs}; +use crate::observed_data_sidecars::{DoNotObserve, ObservationStrategy, Observe}; use crate::{metrics, BeaconChainError}; use kzg::{Error as KzgError, Kzg, KzgCommitment}; use slog::debug; use ssz_derive::{Decode, Encode}; -use ssz_types::VariableList; use std::time::Duration; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; @@ -156,20 +157,16 @@ impl From for GossipBlobError { } } -pub type GossipVerifiedBlobList = VariableList< - GossipVerifiedBlob, - <::EthSpec as EthSpec>::MaxBlobsPerBlock, ->; - /// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on /// the p2p network. #[derive(Debug)] -pub struct GossipVerifiedBlob { +pub struct GossipVerifiedBlob { block_root: Hash256, blob: KzgVerifiedBlob, + _phantom: PhantomData, } -impl GossipVerifiedBlob { +impl GossipVerifiedBlob { pub fn new( blob: Arc>, subnet_id: u64, @@ -178,7 +175,7 @@ impl GossipVerifiedBlob { let header = blob.signed_block_header.clone(); // We only process slashing info if the gossip verification failed // since we do not process the blob any further in that case. - validate_blob_sidecar_for_gossip(blob, subnet_id, chain).map_err(|e| { + validate_blob_sidecar_for_gossip::(blob, subnet_id, chain).map_err(|e| { process_block_slash_info::<_, GossipBlobError>( chain, BlockSlashInfo::from_early_error_blob(header, e), @@ -195,6 +192,7 @@ impl GossipVerifiedBlob { blob, seen_timestamp: Duration::from_secs(0), }, + _phantom: PhantomData, } } pub fn id(&self) -> BlobIdentifier { @@ -335,6 +333,25 @@ impl KzgVerifiedBlobList { verified_blobs: blobs, }) } + + /// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified. + /// + /// This should be used with caution, as used incorrectly it could result in KZG verification + /// being skipped and invalid blobs being deemed valid. + pub fn from_verified>>>( + blobs: I, + seen_timestamp: Duration, + ) -> Self { + Self { + verified_blobs: blobs + .into_iter() + .map(|blob| KzgVerifiedBlob { + blob, + seen_timestamp, + }) + .collect(), + } + } } impl IntoIterator for KzgVerifiedBlobList { @@ -364,11 +381,11 @@ where validate_blobs::(kzg, commitments.as_slice(), blobs, proofs.as_slice()) } -pub fn validate_blob_sidecar_for_gossip( +pub fn validate_blob_sidecar_for_gossip( blob_sidecar: Arc>, subnet: u64, chain: &BeaconChain, -) -> Result, GossipBlobError> { +) -> Result, GossipBlobError> { let blob_slot = blob_sidecar.slot(); let blob_index = blob_sidecar.index; let block_parent_root = blob_sidecar.block_parent_root(); @@ -568,16 +585,45 @@ pub fn validate_blob_sidecar_for_gossip( ) .map_err(|e| GossipBlobError::BeaconChainError(e.into()))?; + if O::observe() { + observe_gossip_blob(&kzg_verified_blob.blob, chain)?; + } + + Ok(GossipVerifiedBlob { + block_root, + blob: kzg_verified_blob, + _phantom: PhantomData, + }) +} + +impl GossipVerifiedBlob { + pub fn observe( + self, + chain: &BeaconChain, + ) -> Result, GossipBlobError> { + observe_gossip_blob(&self.blob.blob, chain)?; + Ok(GossipVerifiedBlob { + block_root: self.block_root, + blob: self.blob, + _phantom: PhantomData, + }) + } +} + +fn observe_gossip_blob( + blob_sidecar: &BlobSidecar, + chain: &BeaconChain, +) -> Result<(), GossipBlobError> { // Now the signature is valid, store the proposal so we don't accept another blob sidecar - // with the same `BlobIdentifier`. - // It's important to double-check that the proposer still hasn't been observed so we don't - // have a race-condition when verifying two blocks simultaneously. + // with the same `BlobIdentifier`. It's important to double-check that the proposer still + // hasn't been observed so we don't have a race-condition when verifying two blocks + // simultaneously. // - // Note: If this BlobSidecar goes on to fail full verification, we do not evict it from the seen_cache - // as alternate blob_sidecars for the same identifier can still be retrieved - // over rpc. Evicting them from this cache would allow faster propagation over gossip. So we allow - // retrieval of potentially valid blocks over rpc, but try to punish the proposer for signing - // invalid messages. Issue for more background + // Note: If this BlobSidecar goes on to fail full verification, we do not evict it from the + // seen_cache as alternate blob_sidecars for the same identifier can still be retrieved over + // rpc. Evicting them from this cache would allow faster propagation over gossip. So we + // allow retrieval of potentially valid blocks over rpc, but try to punish the proposer for + // signing invalid messages. Issue for more background // https://github.com/ethereum/consensus-specs/issues/3261 if chain .observed_blob_sidecars @@ -586,16 +632,12 @@ pub fn validate_blob_sidecar_for_gossip( .map_err(|e| GossipBlobError::BeaconChainError(e.into()))? { return Err(GossipBlobError::RepeatBlob { - proposer: proposer_index as u64, - slot: blob_slot, - index: blob_index, + proposer: blob_sidecar.block_proposer_index(), + slot: blob_sidecar.slot(), + index: blob_sidecar.index, }); } - - Ok(GossipVerifiedBlob { - block_root, - blob: kzg_verified_blob, - }) + Ok(()) } /// Returns the canonical root of the given `blob`. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 527462ab64c..92eb45f9b01 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -683,7 +683,7 @@ pub struct SignatureVerifiedBlock { consensus_context: ConsensusContext, } -/// Used to await the result of executing payload with a remote EE. +/// Used to await the result of executing payload with an EE. type PayloadVerificationHandle = JoinHandle>>; /// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and @@ -750,7 +750,8 @@ pub fn build_blob_data_column_sidecars( &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, &[&blobs.len().to_string()], ); - let sidecars = blobs_to_data_column_sidecars(&blobs, block, &chain.kzg, &chain.spec) + let blob_refs = blobs.iter().collect::>(); + let sidecars = blobs_to_data_column_sidecars(&blob_refs, block, &chain.kzg, &chain.spec) .discard_timer_on_break(&mut timer)?; drop(timer); Ok(sidecars) @@ -1343,7 +1344,6 @@ impl ExecutionPendingBlock { /* * Perform cursory checks to see if the block is even worth processing. */ - check_block_relevancy(block.as_block(), block_root, chain)?; // Define a future that will verify the execution payload with an execution engine. diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index 20edfbf31a4..b8a607c8864 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -88,6 +88,12 @@ pub struct ChainConfig { pub malicious_withhold_count: usize, /// Enable peer sampling on blocks. pub enable_sampling: bool, + /// Number of batches that the node splits blobs or data columns into during publication. + /// This doesn't apply if the node is the block proposer. For PeerDAS only. + pub blob_publication_batches: usize, + /// The delay in milliseconds applied by the node between sending each blob or data column batch. + /// This doesn't apply if the node is the block proposer. + pub blob_publication_batch_interval: Duration, } impl Default for ChainConfig { @@ -121,6 +127,8 @@ impl Default for ChainConfig { enable_light_client_server: false, malicious_withhold_count: 0, enable_sampling: false, + blob_publication_batches: 4, + blob_publication_batch_interval: Duration::from_millis(300), } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 047764d705c..72806a74d27 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -18,7 +18,7 @@ use task_executor::TaskExecutor; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, - Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, Slot, + Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, }; mod error; @@ -146,6 +146,10 @@ impl DataAvailabilityChecker { self.availability_cache.sampling_column_count() } + pub(crate) fn is_supernode(&self) -> bool { + self.get_sampling_column_count() == self.spec.number_of_columns + } + /// Checks if the block root is currenlty in the availability cache awaiting import because /// of missing components. pub fn get_execution_valid_block( @@ -201,7 +205,6 @@ impl DataAvailabilityChecker { pub fn put_rpc_blobs( &self, block_root: Hash256, - epoch: Epoch, blobs: FixedBlobSidecarList, ) -> Result, AvailabilityCheckError> { let seen_timestamp = self @@ -212,15 +215,12 @@ impl DataAvailabilityChecker { // Note: currently not reporting which specific blob is invalid because we fetch all blobs // from the same peer for both lookup and range sync. - let verified_blobs = KzgVerifiedBlobList::new( - Vec::from(blobs).into_iter().flatten(), - &self.kzg, - seen_timestamp, - ) - .map_err(AvailabilityCheckError::InvalidBlobs)?; + let verified_blobs = + KzgVerifiedBlobList::new(blobs.iter().flatten().cloned(), &self.kzg, seen_timestamp) + .map_err(AvailabilityCheckError::InvalidBlobs)?; self.availability_cache - .put_kzg_verified_blobs(block_root, epoch, verified_blobs, &self.log) + .put_kzg_verified_blobs(block_root, verified_blobs, &self.log) } /// Put a list of custody columns received via RPC into the availability cache. This performs KZG @@ -229,7 +229,6 @@ impl DataAvailabilityChecker { pub fn put_rpc_custody_columns( &self, block_root: Hash256, - epoch: Epoch, custody_columns: DataColumnSidecarList, ) -> Result, AvailabilityCheckError> { // TODO(das): report which column is invalid for proper peer scoring @@ -248,12 +247,32 @@ impl DataAvailabilityChecker { self.availability_cache.put_kzg_verified_data_columns( block_root, - epoch, verified_custody_columns, &self.log, ) } + /// Put a list of blobs received from the EL pool into the availability cache. + /// + /// This DOES NOT perform KZG verification because the KZG proofs should have been constructed + /// immediately prior to calling this function so they are assumed to be valid. + pub fn put_engine_blobs( + &self, + block_root: Hash256, + blobs: FixedBlobSidecarList, + ) -> Result, AvailabilityCheckError> { + let seen_timestamp = self + .slot_clock + .now_duration() + .ok_or(AvailabilityCheckError::SlotClockError)?; + + let verified_blobs = + KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp); + + self.availability_cache + .put_kzg_verified_blobs(block_root, verified_blobs, &self.log) + } + /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. @@ -265,7 +284,6 @@ impl DataAvailabilityChecker { ) -> Result, AvailabilityCheckError> { self.availability_cache.put_kzg_verified_blobs( gossip_blob.block_root(), - gossip_blob.epoch(), vec![gossip_blob.into_inner()], &self.log, ) @@ -279,12 +297,9 @@ impl DataAvailabilityChecker { #[allow(clippy::type_complexity)] pub fn put_gossip_data_columns( &self, - slot: Slot, block_root: Hash256, gossip_data_columns: Vec>, ) -> Result, AvailabilityCheckError> { - let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); - let custody_columns = gossip_data_columns .into_iter() .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) @@ -292,7 +307,6 @@ impl DataAvailabilityChecker { self.availability_cache.put_kzg_verified_data_columns( block_root, - epoch, custody_columns, &self.log, ) @@ -595,12 +609,7 @@ impl DataAvailabilityChecker { ); self.availability_cache - .put_kzg_verified_data_columns( - *block_root, - slot.epoch(T::EthSpec::slots_per_epoch()), - data_columns_to_publish.clone(), - &self.log, - ) + .put_kzg_verified_data_columns(*block_root, data_columns_to_publish.clone(), &self.log) .map(|availability| { DataColumnReconstructionResult::Success(( availability, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index dbfa00e6e22..cfdb3cfe91e 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,7 +10,6 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, - UnableToDetermineImportRequirement, Unexpected, SszTypes(ssz_types::Error), MissingBlobs, @@ -44,7 +43,6 @@ impl Error { | Error::Unexpected | Error::ParentStateMissing(_) | Error::BlockReplayError(_) - | Error::UnableToDetermineImportRequirement | Error::RebuildingStateCaches(_) | Error::SlotClockError => ErrorCategory::Internal, Error::InvalidBlobs { .. } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 6d4636e8ed8..40361574aff 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -10,7 +10,7 @@ use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; use slog::{debug, Logger}; -use ssz_types::{FixedVector, VariableList}; +use ssz_types::FixedVector; use std::num::NonZeroUsize; use std::sync::Arc; use types::blob_sidecar::BlobIdentifier; @@ -34,11 +34,6 @@ pub struct PendingComponents { pub reconstruction_started: bool, } -pub enum BlockImportRequirement { - AllBlobs, - ColumnSampling(usize), -} - impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -199,63 +194,49 @@ impl PendingComponents { /// /// Returns `true` if both the block exists and the number of received blobs / custody columns /// matches the number of expected blobs / custody columns. - pub fn is_available( - &self, - block_import_requirement: &BlockImportRequirement, - log: &Logger, - ) -> bool { + pub fn is_available(&self, custody_column_count: usize, log: &Logger) -> bool { let block_kzg_commitments_count_opt = self.block_kzg_commitments_count(); + let expected_blobs_msg = block_kzg_commitments_count_opt + .as_ref() + .map(|num| num.to_string()) + .unwrap_or("unknown".to_string()); - match block_import_requirement { - BlockImportRequirement::AllBlobs => { - let received_blobs = self.num_received_blobs(); - let expected_blobs_msg = block_kzg_commitments_count_opt - .as_ref() - .map(|num| num.to_string()) - .unwrap_or("unknown".to_string()); - - debug!(log, - "Component(s) added to data availability checker"; - "block_root" => ?self.block_root, - "received_block" => block_kzg_commitments_count_opt.is_some(), - "received_blobs" => received_blobs, - "expected_blobs" => expected_blobs_msg, - ); - - block_kzg_commitments_count_opt.map_or(false, |num_expected_blobs| { - num_expected_blobs == received_blobs - }) + // No data columns when there are 0 blobs + let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| { + if blob_count > 0 { + custody_column_count + } else { + 0 } - BlockImportRequirement::ColumnSampling(num_expected_columns) => { - // No data columns when there are 0 blobs - let expected_columns_opt = block_kzg_commitments_count_opt.map(|blob_count| { - if blob_count > 0 { - *num_expected_columns - } else { - 0 - } - }); + }); + let expected_columns_msg = expected_columns_opt + .as_ref() + .map(|num| num.to_string()) + .unwrap_or("unknown".to_string()); - let expected_columns_msg = expected_columns_opt - .as_ref() - .map(|num| num.to_string()) - .unwrap_or("unknown".to_string()); + let num_received_blobs = self.num_received_blobs(); + let num_received_columns = self.num_received_data_columns(); - let num_received_columns = self.num_received_data_columns(); + debug!( + log, + "Component(s) added to data availability checker"; + "block_root" => ?self.block_root, + "received_blobs" => num_received_blobs, + "expected_blobs" => expected_blobs_msg, + "received_columns" => num_received_columns, + "expected_columns" => expected_columns_msg, + ); - debug!(log, - "Component(s) added to data availability checker"; - "block_root" => ?self.block_root, - "received_block" => block_kzg_commitments_count_opt.is_some(), - "received_columns" => num_received_columns, - "expected_columns" => expected_columns_msg, - ); + let all_blobs_received = block_kzg_commitments_count_opt + .map_or(false, |num_expected_blobs| { + num_expected_blobs == num_received_blobs + }); - expected_columns_opt.map_or(false, |num_expected_columns| { - num_expected_columns == num_received_columns - }) - } - } + let all_columns_received = expected_columns_opt.map_or(false, |num_expected_columns| { + num_expected_columns == num_received_columns + }); + + all_blobs_received || all_columns_received } /// Returns an empty `PendingComponents` object with the given block root. @@ -277,7 +258,6 @@ impl PendingComponents { /// reconstructed from disk. Ensure you are not holding any write locks while calling this. pub fn make_available( self, - block_import_requirement: BlockImportRequirement, spec: &Arc, recover: R, ) -> Result, AvailabilityCheckError> @@ -304,26 +284,25 @@ impl PendingComponents { return Err(AvailabilityCheckError::Unexpected); }; - let (blobs, data_columns) = match block_import_requirement { - BlockImportRequirement::AllBlobs => { - let num_blobs_expected = diet_executed_block.num_blobs_expected(); - let Some(verified_blobs) = verified_blobs - .into_iter() - .map(|b| b.map(|b| b.to_blob())) - .take(num_blobs_expected) - .collect::>>() - else { - return Err(AvailabilityCheckError::Unexpected); - }; - (Some(VariableList::new(verified_blobs)?), None) - } - BlockImportRequirement::ColumnSampling(_) => { - let verified_data_columns = verified_data_columns - .into_iter() - .map(|d| d.into_inner()) - .collect(); - (None, Some(verified_data_columns)) - } + let is_peer_das_enabled = spec.is_peer_das_enabled_for_epoch(diet_executed_block.epoch()); + let (blobs, data_columns) = if is_peer_das_enabled { + let data_columns = verified_data_columns + .into_iter() + .map(|d| d.into_inner()) + .collect::>(); + (None, Some(data_columns)) + } else { + let num_blobs_expected = diet_executed_block.num_blobs_expected(); + let Some(verified_blobs) = verified_blobs + .into_iter() + .map(|b| b.map(|b| b.to_blob())) + .take(num_blobs_expected) + .collect::>>() + .map(Into::into) + else { + return Err(AvailabilityCheckError::Unexpected); + }; + (Some(verified_blobs), None) }; let executed_block = recover(diet_executed_block)?; @@ -475,24 +454,9 @@ impl DataAvailabilityCheckerInner { f(self.critical.read().peek(block_root)) } - fn block_import_requirement( - &self, - epoch: Epoch, - ) -> Result { - let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch); - if peer_das_enabled { - Ok(BlockImportRequirement::ColumnSampling( - self.sampling_column_count, - )) - } else { - Ok(BlockImportRequirement::AllBlobs) - } - } - pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, - epoch: Epoch, kzg_verified_blobs: I, log: &Logger, ) -> Result, AvailabilityCheckError> { @@ -515,12 +479,11 @@ impl DataAvailabilityCheckerInner { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement, log) { + if pending_components.is_available(self.sampling_column_count, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(block_import_requirement, &self.spec, |diet_block| { + pending_components.make_available(&self.spec, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -535,7 +498,6 @@ impl DataAvailabilityCheckerInner { >( &self, block_root: Hash256, - epoch: Epoch, kzg_verified_data_columns: I, log: &Logger, ) -> Result, AvailabilityCheckError> { @@ -550,13 +512,11 @@ impl DataAvailabilityCheckerInner { // Merge in the data columns. pending_components.merge_data_columns(kzg_verified_data_columns)?; - let block_import_requirement = self.block_import_requirement(epoch)?; - - if pending_components.is_available(&block_import_requirement, log) { + if pending_components.is_available(self.sampling_column_count, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(block_import_requirement, &self.spec, |diet_block| { + pending_components.make_available(&self.spec, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -625,7 +585,6 @@ impl DataAvailabilityCheckerInner { ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); let block_root = executed_block.import_data.block_root; - let epoch = executed_block.block.epoch(); // register the block to get the diet block let diet_executed_block = self @@ -642,12 +601,11 @@ impl DataAvailabilityCheckerInner { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - let block_import_requirement = self.block_import_requirement(epoch)?; - if pending_components.is_available(&block_import_requirement, log) { + if pending_components.is_available(self.sampling_column_count, log) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(block_import_requirement, &self.spec, |diet_block| { + pending_components.make_available(&self.spec, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -703,6 +661,7 @@ impl DataAvailabilityCheckerInner { #[cfg(test)] mod test { use super::*; + use crate::{ blob_verification::GossipVerifiedBlob, block_verification::PayloadVerificationOutcome, @@ -712,6 +671,7 @@ mod test { test_utils::{BaseHarnessType, BeaconChainHarness, DiskHarnessType}, }; use fork_choice::PayloadVerificationStatus; + use logging::test_logger; use slog::{info, Logger}; use state_processing::ConsensusContext; @@ -931,7 +891,6 @@ mod test { let (pending_block, blobs) = availability_pending_block(&harness).await; let root = pending_block.import_data.block_root; - let epoch = pending_block.block.epoch(); let blobs_expected = pending_block.num_blobs_expected(); assert_eq!( @@ -980,7 +939,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger()) + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -1002,12 +961,11 @@ mod test { "should have expected number of blobs" ); let root = pending_block.import_data.block_root; - let epoch = pending_block.block.epoch(); let mut kzg_verified_blobs = vec![]; for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone(), harness.logger()) + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger()) .expect("should put blob"); assert_eq!( availability, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 03e3289118d..5b9b7c70233 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -57,6 +57,11 @@ impl DietAvailabilityPendingExecutedBlock { .cloned() .unwrap_or_default() } + + /// Returns the epoch corresponding to `self.slot()`. + pub fn epoch(&self) -> Epoch { + self.block.slot().epoch(E::slots_per_epoch()) + } } /// This LRU cache holds BeaconStates used for block import. If the cache overflows, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index a4e83b27514..6cfd26786aa 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -3,6 +3,7 @@ use crate::block_verification::{ BlockSlashInfo, }; use crate::kzg_utils::{reconstruct_data_columns, validate_data_columns}; +use crate::observed_data_sidecars::{ObservationStrategy, Observe}; use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; use derivative::Derivative; use fork_choice::ProtoBlock; @@ -13,6 +14,7 @@ use slog::debug; use slot_clock::SlotClock; use ssz_derive::{Decode, Encode}; use std::iter; +use std::marker::PhantomData; use std::sync::Arc; use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; use types::{ @@ -160,17 +162,16 @@ impl From for GossipDataColumnError { } } -pub type GossipVerifiedDataColumnList = RuntimeVariableList>; - /// A wrapper around a `DataColumnSidecar` that indicates it has been approved for re-gossiping on /// the p2p network. #[derive(Debug)] -pub struct GossipVerifiedDataColumn { +pub struct GossipVerifiedDataColumn { block_root: Hash256, data_column: KzgVerifiedDataColumn, + _phantom: PhantomData, } -impl GossipVerifiedDataColumn { +impl GossipVerifiedDataColumn { pub fn new( column_sidecar: Arc>, subnet_id: u64, @@ -179,12 +180,14 @@ impl GossipVerifiedDataColumn { let header = column_sidecar.signed_block_header.clone(); // We only process slashing info if the gossip verification failed // since we do not process the data column any further in that case. - validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { - process_block_slash_info::<_, GossipDataColumnError>( - chain, - BlockSlashInfo::from_early_error_data_column(header, e), - ) - }) + validate_data_column_sidecar_for_gossip::(column_sidecar, subnet_id, chain).map_err( + |e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }, + ) } pub fn id(&self) -> DataColumnIdentifier { @@ -375,11 +378,11 @@ where Ok(()) } -pub fn validate_data_column_sidecar_for_gossip( +pub fn validate_data_column_sidecar_for_gossip( data_column: Arc>, subnet: u64, chain: &BeaconChain, -) -> Result, GossipDataColumnError> { +) -> Result, GossipDataColumnError> { let column_slot = data_column.slot(); verify_data_column_sidecar(&data_column, &chain.spec)?; verify_index_matches_subnet(&data_column, subnet, &chain.spec)?; @@ -404,9 +407,14 @@ pub fn validate_data_column_sidecar_for_gossip( ) .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))?; + if O::observe() { + observe_gossip_data_column(&kzg_verified_data_column.data, chain)?; + } + Ok(GossipVerifiedDataColumn { block_root: data_column.block_root(), data_column: kzg_verified_data_column, + _phantom: PhantomData, }) } @@ -648,11 +656,42 @@ fn verify_sidecar_not_from_future_slot( Ok(()) } +pub fn observe_gossip_data_column( + data_column_sidecar: &DataColumnSidecar, + chain: &BeaconChain, +) -> Result<(), GossipDataColumnError> { + // Now the signature is valid, store the proposal so we don't accept another data column sidecar + // with the same `DataColumnIdentifier`. It's important to double-check that the proposer still + // hasn't been observed so we don't have a race-condition when verifying two blocks + // simultaneously. + // + // Note: If this DataColumnSidecar goes on to fail full verification, we do not evict it from the + // seen_cache as alternate data_column_sidecars for the same identifier can still be retrieved over + // rpc. Evicting them from this cache would allow faster propagation over gossip. So we + // allow retrieval of potentially valid blocks over rpc, but try to punish the proposer for + // signing invalid messages. Issue for more background + // https://github.com/ethereum/consensus-specs/issues/3261 + if chain + .observed_column_sidecars + .write() + .observe_sidecar(data_column_sidecar) + .map_err(|e| GossipDataColumnError::BeaconChainError(e.into()))? + { + return Err(GossipDataColumnError::PriorKnown { + proposer: data_column_sidecar.block_proposer_index(), + slot: data_column_sidecar.slot(), + index: data_column_sidecar.index, + }); + } + Ok(()) +} + #[cfg(test)] mod test { use crate::data_column_verification::{ validate_data_column_sidecar_for_gossip, GossipDataColumnError, }; + use crate::observed_data_sidecars::Observe; use crate::test_utils::BeaconChainHarness; use types::{DataColumnSidecar, EthSpec, ForkName, MainnetEthSpec}; @@ -691,8 +730,11 @@ mod test { .unwrap(), }; - let result = - validate_data_column_sidecar_for_gossip(column_sidecar.into(), index, &harness.chain); + let result = validate_data_column_sidecar_for_gossip::<_, Observe>( + column_sidecar.into(), + index, + &harness.chain, + ); assert!(matches!( result.err(), Some(GossipDataColumnError::UnexpectedDataColumn) diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs new file mode 100644 index 00000000000..46266b036a3 --- /dev/null +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -0,0 +1,308 @@ +//! This module implements an optimisation to fetch blobs via JSON-RPC from the EL. +//! If a blob has already been seen in the public mempool, then it is often unnecessary to wait for +//! it to arrive on P2P gossip. This PR uses a new JSON-RPC method (`engine_getBlobsV1`) which +//! allows the CL to load the blobs quickly from the EL's blob pool. +//! +//! Once the node fetches the blobs from EL, it then publishes the remaining blobs that hasn't seen +//! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count, +//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity +//! supernodes. +use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::kzg_utils::blobs_to_data_column_sidecars; +use crate::observed_data_sidecars::DoNotObserve; +use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError}; +use execution_layer::json_structures::BlobAndProofV1; +use execution_layer::Error as ExecutionLayerError; +use metrics::{inc_counter, inc_counter_by, TryExt}; +use slog::{debug, error, o, Logger}; +use ssz_types::FixedVector; +use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; +use std::sync::Arc; +use tokio::sync::mpsc::Receiver; +use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; +use types::{ + BeaconStateError, BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, FullPayload, + Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, +}; + +pub enum BlobsOrDataColumns { + Blobs(Vec>), + DataColumns(DataColumnSidecarList), +} + +#[derive(Debug)] +pub enum FetchEngineBlobError { + BeaconStateError(BeaconStateError), + BlobProcessingError(BlockError), + BlobSidecarError(BlobSidecarError), + ExecutionLayerMissing, + InternalError(String), + GossipBlob(GossipBlobError), + RequestFailed(ExecutionLayerError), + RuntimeShutdown, +} + +/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or +/// data columns (PeerDAS onwards) to the network, using the supplied `publish_fn`. +pub async fn fetch_and_process_engine_blobs( + chain: Arc>, + block_root: Hash256, + block: Arc>>, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, +) -> Result, FetchEngineBlobError> { + let block_root_str = format!("{:?}", block_root); + let log = chain + .log + .new(o!("service" => "fetch_engine_blobs", "block_root" => block_root_str)); + + let versioned_hashes = if let Some(kzg_commitments) = block + .message() + .body() + .blob_kzg_commitments() + .ok() + .filter(|blobs| !blobs.is_empty()) + { + kzg_commitments + .iter() + .map(kzg_commitment_to_versioned_hash) + .collect::>() + } else { + debug!( + log, + "Fetch blobs not triggered - none required"; + ); + return Ok(None); + }; + + let num_expected_blobs = versioned_hashes.len(); + + let execution_layer = chain + .execution_layer + .as_ref() + .ok_or(FetchEngineBlobError::ExecutionLayerMissing)?; + + debug!( + log, + "Fetching blobs from the EL"; + "num_expected_blobs" => num_expected_blobs, + ); + let response = execution_layer + .get_blobs(versioned_hashes) + .await + .map_err(FetchEngineBlobError::RequestFailed)?; + + if response.is_empty() { + debug!( + log, + "No blobs fetched from the EL"; + "num_expected_blobs" => num_expected_blobs, + ); + inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL); + return Ok(None); + } else { + inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL); + } + + let (signed_block_header, kzg_commitments_proof) = block + .signed_block_header_and_kzg_commitments_proof() + .map_err(FetchEngineBlobError::BeaconStateError)?; + + let fixed_blob_sidecar_list = build_blob_sidecars( + &block, + response, + signed_block_header, + &kzg_commitments_proof, + )?; + + let num_fetched_blobs = fixed_blob_sidecar_list + .iter() + .filter(|b| b.is_some()) + .count(); + + inc_counter_by( + &metrics::BLOBS_FROM_EL_EXPECTED_TOTAL, + num_expected_blobs as u64, + ); + inc_counter_by( + &metrics::BLOBS_FROM_EL_RECEIVED_TOTAL, + num_fetched_blobs as u64, + ); + + // Gossip verify blobs before publishing. This prevents blobs with invalid KZG proofs from + // the EL making it into the data availability checker. We do not immediately add these + // blobs to the observed blobs/columns cache because we want to allow blobs/columns to arrive on gossip + // and be accepted (and propagated) while we are waiting to publish. Just before publishing + // we will observe the blobs/columns and only proceed with publishing if they are not yet seen. + let blobs_to_import_and_publish = fixed_blob_sidecar_list + .iter() + .filter_map(|opt_blob| { + let blob = opt_blob.as_ref()?; + match GossipVerifiedBlob::::new(blob.clone(), blob.index, &chain) { + Ok(verified) => Some(Ok(verified)), + // Ignore already seen blobs. + Err(GossipBlobError::RepeatBlob { .. }) => None, + Err(e) => Some(Err(e)), + } + }) + .collect::, _>>() + .map_err(FetchEngineBlobError::GossipBlob)?; + + let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); + + let data_columns_receiver_opt = if peer_das_enabled { + // Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns. + if num_fetched_blobs != num_expected_blobs { + debug!( + log, + "Not all blobs fetched from the EL"; + "info" => "Unable to compute data columns", + "num_fetched_blobs" => num_fetched_blobs, + "num_expected_blobs" => num_expected_blobs, + ); + return Ok(None); + } + + let data_columns_receiver = spawn_compute_and_publish_data_columns_task( + &chain, + block.clone(), + fixed_blob_sidecar_list.clone(), + publish_fn, + log.clone(), + ); + + Some(data_columns_receiver) + } else { + if !blobs_to_import_and_publish.is_empty() { + publish_fn(BlobsOrDataColumns::Blobs(blobs_to_import_and_publish)); + } + + None + }; + + debug!( + log, + "Processing engine blobs"; + "num_fetched_blobs" => num_fetched_blobs, + ); + + let availability_processing_status = chain + .process_engine_blobs( + block.slot(), + block_root, + fixed_blob_sidecar_list.clone(), + data_columns_receiver_opt, + ) + .await + .map_err(FetchEngineBlobError::BlobProcessingError)?; + + Ok(Some(availability_processing_status)) +} + +/// Spawn a blocking task here for long computation tasks, so it doesn't block processing, and it +/// allows blobs / data columns to propagate without waiting for processing. +/// +/// An `mpsc::Sender` is then used to send the produced data columns to the `beacon_chain` for it +/// to be persisted, **after** the block is made attestable. +/// +/// The reason for doing this is to make the block available and attestable as soon as possible, +/// while maintaining the invariant that block and data columns are persisted atomically. +fn spawn_compute_and_publish_data_columns_task( + chain: &Arc>, + block: Arc>>, + blobs: FixedBlobSidecarList, + publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, + log: Logger, +) -> Receiver>>> { + let chain_cloned = chain.clone(); + let (data_columns_sender, data_columns_receiver) = tokio::sync::mpsc::channel(1); + + chain.task_executor.spawn_blocking( + move || { + let mut timer = metrics::start_timer_vec( + &metrics::DATA_COLUMN_SIDECAR_COMPUTATION, + &[&blobs.len().to_string()], + ); + let blob_refs = blobs + .iter() + .filter_map(|b| b.as_ref().map(|b| &b.blob)) + .collect::>(); + let data_columns_result = blobs_to_data_column_sidecars( + &blob_refs, + &block, + &chain_cloned.kzg, + &chain_cloned.spec, + ) + .discard_timer_on_break(&mut timer); + drop(timer); + + let all_data_columns = match data_columns_result { + Ok(d) => d, + Err(e) => { + error!( + log, + "Failed to build data column sidecars from blobs"; + "error" => ?e + ); + return; + } + }; + + if let Err(e) = data_columns_sender.try_send(all_data_columns.clone()) { + error!(log, "Failed to send computed data columns"; "error" => ?e); + }; + + // Check indices from cache before sending the columns, to make sure we don't + // publish components already seen on gossip. + let is_supernode = chain_cloned.data_availability_checker.is_supernode(); + + // At the moment non supernodes are not required to publish any columns. + // TODO(das): we could experiment with having full nodes publish their custodied + // columns here. + if !is_supernode { + return; + } + + publish_fn(BlobsOrDataColumns::DataColumns(all_data_columns)); + }, + "compute_and_publish_data_columns", + ); + + data_columns_receiver +} + +fn build_blob_sidecars( + block: &Arc>>, + response: Vec>>, + signed_block_header: SignedBeaconBlockHeader, + kzg_commitments_inclusion_proof: &FixedVector, +) -> Result, FetchEngineBlobError> { + let mut fixed_blob_sidecar_list = FixedBlobSidecarList::default(); + for (index, blob_and_proof) in response + .into_iter() + .enumerate() + .filter_map(|(i, opt_blob)| Some((i, opt_blob?))) + { + match BlobSidecar::new_with_existing_proof( + index, + blob_and_proof.blob, + block, + signed_block_header.clone(), + kzg_commitments_inclusion_proof, + blob_and_proof.proof, + ) { + Ok(blob) => { + if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(index) { + *blob_mut = Some(Arc::new(blob)); + } else { + return Err(FetchEngineBlobError::InternalError(format!( + "Blobs from EL contains blob with invalid index {index}" + ))); + } + } + Err(e) => { + return Err(FetchEngineBlobError::BlobSidecarError(e)); + } + } + } + Ok(fixed_blob_sidecar_list) +} diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index 91c1098f81f..1680c0298d1 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -7,8 +7,8 @@ use std::sync::Arc; use types::beacon_block_body::KzgCommitments; use types::data_column_sidecar::{Cell, DataColumn, DataColumnSidecarError}; use types::{ - Blob, BlobsList, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - Hash256, KzgCommitment, KzgProof, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, + Blob, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, + KzgCommitment, KzgProof, KzgProofs, SignedBeaconBlock, SignedBeaconBlockHeader, }; /// Converts a blob ssz List object to an array to be used with the kzg @@ -146,7 +146,7 @@ pub fn verify_kzg_proof( /// Build data column sidecars from a signed beacon block and its blobs. pub fn blobs_to_data_column_sidecars( - blobs: &BlobsList, + blobs: &[&Blob], block: &SignedBeaconBlock, kzg: &Kzg, spec: &ChainSpec, @@ -154,6 +154,7 @@ pub fn blobs_to_data_column_sidecars( if blobs.is_empty() { return Ok(vec![]); } + let kzg_commitments = block .message() .body() @@ -312,19 +313,21 @@ mod test { #[track_caller] fn test_build_data_columns_empty(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 0; - let (signed_block, blob_sidecars) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_sidecars, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); assert!(column_sidecars.is_empty()); } #[track_caller] fn test_build_data_columns(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 6; - let (signed_block, blob_sidecars) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_sidecars, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); let block_kzg_commitments = signed_block .message() @@ -358,9 +361,10 @@ mod test { #[track_caller] fn test_reconstruct_data_columns(kzg: &Kzg, spec: &ChainSpec) { let num_of_blobs = 6; - let (signed_block, blob_sidecars) = create_test_block_and_blobs::(num_of_blobs, spec); + let (signed_block, blobs) = create_test_block_and_blobs::(num_of_blobs, spec); + let blob_refs = blobs.iter().collect::>(); let column_sidecars = - blobs_to_data_column_sidecars(&blob_sidecars, &signed_block, kzg, spec).unwrap(); + blobs_to_data_column_sidecars(&blob_refs, &signed_block, kzg, spec).unwrap(); // Now reconstruct let reconstructed_columns = reconstruct_data_columns( diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index b89c00e0af4..2953516fb1a 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -28,6 +28,7 @@ pub mod eth1_chain; mod eth1_finalization_cache; pub mod events; pub mod execution_payload; +pub mod fetch_blobs; pub mod fork_choice_signal; pub mod fork_revert; pub mod graffiti_calculator; @@ -43,7 +44,7 @@ mod naive_aggregation_pool; pub mod observed_aggregates; mod observed_attesters; pub mod observed_block_producers; -mod observed_data_sidecars; +pub mod observed_data_sidecars; pub mod observed_operations; mod observed_slashable; pub mod otb_verification_service; diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index f73775d678f..66b300f7f2e 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -111,6 +111,13 @@ pub static BLOCK_PROCESSING_POST_EXEC_PROCESSING: LazyLock> = linear_buckets(5e-3, 5e-3, 10), ) }); +pub static BLOCK_PROCESSING_DATA_COLUMNS_WAIT: LazyLock> = LazyLock::new(|| { + try_create_histogram_with_buckets( + "beacon_block_processing_data_columns_wait_seconds", + "Time spent waiting for data columns to be computed before starting database write", + exponential_buckets(0.01, 2.0, 10), + ) +}); pub static BLOCK_PROCESSING_DB_WRITE: LazyLock> = LazyLock::new(|| { try_create_histogram( "beacon_block_processing_db_write_seconds", @@ -1691,6 +1698,34 @@ pub static DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_blobs_from_el_hit_total", + "Number of blob batches fetched from the execution layer", + ) +}); + +pub static BLOBS_FROM_EL_MISS_TOTAL: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_blobs_from_el_miss_total", + "Number of blob batches failed to fetch from the execution layer", + ) +}); + +pub static BLOBS_FROM_EL_EXPECTED_TOTAL: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_blobs_from_el_expected_total", + "Number of blobs expected from the execution layer", + ) +}); + +pub static BLOBS_FROM_EL_RECEIVED_TOTAL: LazyLock> = LazyLock::new(|| { + try_create_int_counter( + "beacon_blobs_from_el_received_total", + "Number of blobs fetched from the execution layer", + ) +}); + /* * Light server message verification */ diff --git a/beacon_node/beacon_chain/src/observed_data_sidecars.rs b/beacon_node/beacon_chain/src/observed_data_sidecars.rs index 9b59a8f85b1..53f8c71f54e 100644 --- a/beacon_node/beacon_chain/src/observed_data_sidecars.rs +++ b/beacon_node/beacon_chain/src/observed_data_sidecars.rs @@ -148,6 +148,31 @@ impl ObservedDataSidecars { } } +/// Abstraction to control "observation" of gossip messages (currently just blobs and data columns). +/// +/// If a type returns `false` for `observe` then the message will not be immediately added to its +/// respective gossip observation cache. Unobserved messages should usually be observed later. +pub trait ObservationStrategy { + fn observe() -> bool; +} + +/// Type for messages that are observed immediately. +pub struct Observe; +/// Type for messages that have not been observed. +pub struct DoNotObserve; + +impl ObservationStrategy for Observe { + fn observe() -> bool { + true + } +} + +impl ObservationStrategy for DoNotObserve { + fn observe() -> bool { + false + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 9be3b4cc2f9..093ee0c44b4 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2894,7 +2894,6 @@ pub fn generate_rand_block_and_blobs( (block, blob_sidecars) } -#[allow(clippy::type_complexity)] pub fn generate_rand_block_and_data_columns( fork_name: ForkName, num_blobs: NumBlobs, @@ -2902,12 +2901,12 @@ pub fn generate_rand_block_and_data_columns( spec: &ChainSpec, ) -> ( SignedBeaconBlock>, - Vec>>, + DataColumnSidecarList, ) { let kzg = get_kzg(spec); let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng); - let blob: BlobsList = blobs.into_iter().map(|b| b.blob).collect::>().into(); - let data_columns = blobs_to_data_column_sidecars(&blob, &block, &kzg, spec).unwrap(); + let blob_refs = blobs.iter().map(|b| &b.blob).collect::>(); + let data_columns = blobs_to_data_column_sidecars(&blob_refs, &block, &kzg, spec).unwrap(); (block, data_columns) } diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index d239f5089aa..f094a173eec 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -976,7 +976,7 @@ async fn block_gossip_verification() { harness .chain - .process_gossip_blob(gossip_verified, || Ok(())) + .process_gossip_blob(gossip_verified) .await .expect("should import valid gossip verified blob"); } @@ -1247,7 +1247,7 @@ async fn verify_block_for_gossip_slashing_detection() { .unwrap(); harness .chain - .process_gossip_blob(verified_blob, || Ok(())) + .process_gossip_blob(verified_blob) .await .unwrap(); } @@ -1726,7 +1726,7 @@ async fn import_execution_pending_block( .unwrap() { ExecutedBlock::Available(block) => chain - .import_available_block(Box::from(block)) + .import_available_block(Box::from(block), None) .await .map_err(|e| format!("{e:?}")), ExecutedBlock::AvailabilityPending(_) => { diff --git a/beacon_node/beacon_chain/tests/events.rs b/beacon_node/beacon_chain/tests/events.rs index 31e69f0524e..ab784d3be45 100644 --- a/beacon_node/beacon_chain/tests/events.rs +++ b/beacon_node/beacon_chain/tests/events.rs @@ -35,7 +35,7 @@ async fn blob_sidecar_event_on_process_gossip_blob() { let _ = harness .chain - .process_gossip_blob(gossip_verified_blob, || Ok(())) + .process_gossip_blob(gossip_verified_blob) .await .unwrap(); diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 1c23c8ba665..083aaf2e258 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,7 +1,7 @@ use crate::engines::ForkchoiceState; use crate::http::{ ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_FORKCHOICE_UPDATED_V3, - ENGINE_GET_CLIENT_VERSION_V1, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, + ENGINE_GET_BLOBS_V1, ENGINE_GET_CLIENT_VERSION_V1, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2, ENGINE_GET_PAYLOAD_V3, ENGINE_GET_PAYLOAD_V4, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V4, @@ -507,6 +507,7 @@ pub struct EngineCapabilities { pub get_payload_v3: bool, pub get_payload_v4: bool, pub get_client_version_v1: bool, + pub get_blobs_v1: bool, } impl EngineCapabilities { @@ -554,6 +555,9 @@ impl EngineCapabilities { if self.get_client_version_v1 { response.push(ENGINE_GET_CLIENT_VERSION_V1); } + if self.get_blobs_v1 { + response.push(ENGINE_GET_BLOBS_V1); + } response } diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 9c2c43bcf7c..d4734be448d 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -58,6 +58,9 @@ pub const ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT: Duration = Duration::from_secs(1 pub const ENGINE_GET_CLIENT_VERSION_V1: &str = "engine_getClientVersionV1"; pub const ENGINE_GET_CLIENT_VERSION_TIMEOUT: Duration = Duration::from_secs(1); +pub const ENGINE_GET_BLOBS_V1: &str = "engine_getBlobsV1"; +pub const ENGINE_GET_BLOBS_TIMEOUT: Duration = Duration::from_secs(1); + /// This error is returned during a `chainId` call by Geth. pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block"; /// This code is returned by all clients when a method is not supported @@ -79,6 +82,7 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[ ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1, ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_CLIENT_VERSION_V1, + ENGINE_GET_BLOBS_V1, ]; /// We opt to initialize the JsonClientVersionV1 rather than the ClientVersionV1 @@ -702,6 +706,20 @@ impl HttpJsonRpc { } } + pub async fn get_blobs( + &self, + versioned_hashes: Vec, + ) -> Result>>, Error> { + let params = json!([versioned_hashes]); + + self.rpc_request( + ENGINE_GET_BLOBS_V1, + params, + ENGINE_GET_BLOBS_TIMEOUT * self.execution_timeout_multiplier, + ) + .await + } + pub async fn get_block_by_number<'a>( &self, query: BlockByNumberQuery<'a>, @@ -1067,6 +1085,7 @@ impl HttpJsonRpc { get_payload_v3: capabilities.contains(ENGINE_GET_PAYLOAD_V3), get_payload_v4: capabilities.contains(ENGINE_GET_PAYLOAD_V4), get_client_version_v1: capabilities.contains(ENGINE_GET_CLIENT_VERSION_V1), + get_blobs_v1: capabilities.contains(ENGINE_GET_BLOBS_V1), }) } diff --git a/beacon_node/execution_layer/src/engine_api/json_structures.rs b/beacon_node/execution_layer/src/engine_api/json_structures.rs index 753554c149a..efd68f1023d 100644 --- a/beacon_node/execution_layer/src/engine_api/json_structures.rs +++ b/beacon_node/execution_layer/src/engine_api/json_structures.rs @@ -7,7 +7,7 @@ use superstruct::superstruct; use types::beacon_block_body::KzgCommitments; use types::blob_sidecar::BlobsList; use types::execution_requests::{ConsolidationRequests, DepositRequests, WithdrawalRequests}; -use types::{FixedVector, Unsigned}; +use types::{Blob, FixedVector, KzgProof, Unsigned}; #[derive(Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -625,6 +625,14 @@ impl From> for BlobsBundle { } } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(bound = "E: EthSpec", rename_all = "camelCase")] +pub struct BlobAndProofV1 { + #[serde(with = "ssz_types::serde_utils::hex_fixed_vec")] + pub blob: Blob, + pub proof: KzgProof, +} + #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct JsonForkchoiceStateV1 { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index f7e490233fe..08a00d7bf8d 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -4,6 +4,7 @@ //! This crate only provides useful functionality for "The Merge", it does not provide any of the //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. +use crate::json_structures::BlobAndProofV1; use crate::payload_cache::PayloadCache; use arc_swap::ArcSwapOption; use auth::{strip_prefix, Auth, JwtKey}; @@ -65,7 +66,7 @@ mod metrics; pub mod payload_cache; mod payload_status; pub mod test_utils; -mod versioned_hashes; +pub mod versioned_hashes; /// Indicates the default jwt authenticated execution endpoint. pub const DEFAULT_EXECUTION_ENDPOINT: &str = "http://localhost:8551/"; @@ -1857,6 +1858,23 @@ impl ExecutionLayer { } } + pub async fn get_blobs( + &self, + query: Vec, + ) -> Result>>, Error> { + let capabilities = self.get_engine_capabilities(None).await?; + + if capabilities.get_blobs_v1 { + self.engine() + .request(|engine| async move { engine.api.get_blobs(query).await }) + .await + .map_err(Box::new) + .map_err(Error::EngineError) + } else { + Ok(vec![None; query.len()]) + } + } + pub async fn get_block_by_number( &self, query: BlockByNumberQuery<'_>, diff --git a/beacon_node/execution_layer/src/test_utils/mod.rs b/beacon_node/execution_layer/src/test_utils/mod.rs index be99b380543..1e71fde2551 100644 --- a/beacon_node/execution_layer/src/test_utils/mod.rs +++ b/beacon_node/execution_layer/src/test_utils/mod.rs @@ -53,6 +53,7 @@ pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities { get_payload_v3: true, get_payload_v4: true, get_client_version_v1: true, + get_blobs_v1: true, }; pub static DEFAULT_CLIENT_VERSION: LazyLock = diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index fceeb2dd231..b5aa23acf8e 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -1,4 +1,5 @@ use crate::metrics; +use std::future::Future; use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; @@ -13,9 +14,10 @@ use eth2::types::{ PublishBlockRequest, SignedBlockContents, }; use execution_layer::ProvenancedPayload; +use futures::TryFutureExt; use lighthouse_network::{NetworkGlobals, PubsubMessage}; use network::NetworkMessage; -use rand::seq::SliceRandom; +use rand::prelude::SliceRandom; use slog::{debug, error, info, warn, Logger}; use slot_clock::SlotClock; use std::marker::PhantomData; @@ -26,9 +28,8 @@ use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource, - DataColumnSidecarList, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, - FullPayload, FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock, - SignedBlindedBeaconBlock, + DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, + FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock, SignedBlindedBeaconBlock, }; use warp::http::StatusCode; use warp::{reply::Response, Rejection, Reply}; @@ -97,14 +98,9 @@ pub async fn publish_block>( }; let block = unverified_block.inner_block(); debug!(log, "Signed block received in HTTP API"; "slot" => block.slot()); - let malicious_withhold_count = chain.config.malicious_withhold_count; - let chain_cloned = chain.clone(); /* actually publish a block */ let publish_block_p2p = move |block: Arc>, - should_publish_block: bool, - blob_sidecars: Vec>>, - mut data_column_sidecars: DataColumnSidecarList, sender, log, seen_timestamp| @@ -120,53 +116,16 @@ pub async fn publish_block>( publish_delay, ); - let mut pubsub_messages = if should_publish_block { - info!( - log, - "Signed block published to network via HTTP API"; - "slot" => block.slot(), - "blobs_published" => blob_sidecars.len(), - "publish_delay_ms" => publish_delay.as_millis(), - ); - vec![PubsubMessage::BeaconBlock(block.clone())] - } else { - vec![] - }; + info!( + log, + "Signed block published to network via HTTP API"; + "slot" => block.slot(), + "publish_delay_ms" => publish_delay.as_millis(), + ); - match block.as_ref() { - SignedBeaconBlock::Base(_) - | SignedBeaconBlock::Altair(_) - | SignedBeaconBlock::Bellatrix(_) - | SignedBeaconBlock::Capella(_) => { - crate::publish_pubsub_messages(&sender, pubsub_messages) - .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; - } - SignedBeaconBlock::Deneb(_) | SignedBeaconBlock::Electra(_) => { - for blob in blob_sidecars.into_iter() { - pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new((blob.index, blob)))); - } - if malicious_withhold_count > 0 { - let columns_to_keep = data_column_sidecars - .len() - .saturating_sub(malicious_withhold_count); - // Randomize columns before dropping the last malicious_withhold_count items - data_column_sidecars.shuffle(&mut rand::thread_rng()); - drop(data_column_sidecars.drain(columns_to_keep..)); - } + crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())) + .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; - for data_col in data_column_sidecars { - let subnet = DataColumnSubnetId::from_column_index::( - data_col.index as usize, - &chain_cloned.spec, - ); - pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new(( - subnet, data_col, - )))); - } - crate::publish_pubsub_messages(&sender, pubsub_messages) - .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish))?; - } - }; Ok(()) }; @@ -174,145 +133,11 @@ pub async fn publish_block>( let slot = block.message().slot(); let sender_clone = network_tx.clone(); - // Convert blobs to either: - // - // 1. Blob sidecars if prior to peer DAS, or - // 2. Data column sidecars if post peer DAS. - let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); - - let (blob_sidecars, data_column_sidecars) = match unverified_blobs { - // Pre-PeerDAS: construct blob sidecars for the network. - Some((kzg_proofs, blobs)) if !peer_das_enabled => { - let blob_sidecars = kzg_proofs - .into_iter() - .zip(blobs) - .enumerate() - .map(|(i, (proof, unverified_blob))| { - let _timer = metrics::start_timer( - &beacon_chain::metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION, - ); - let blob_sidecar = - BlobSidecar::new(i, unverified_blob, &block, proof).map(Arc::new); - blob_sidecar.map_err(|e| { - error!( - log, - "Invalid blob - not publishing block"; - "error" => ?e, - "blob_index" => i, - "slot" => slot, - ); - warp_utils::reject::custom_bad_request(format!("{e:?}")) - }) - }) - .collect::, Rejection>>()?; - (blob_sidecars, vec![]) - } - // Post PeerDAS: construct data columns. - Some((_, blobs)) => { - // TODO(das): this is sub-optimal and should likely not be happening prior to gossip - // block publishing. - let data_column_sidecars = build_blob_data_column_sidecars(&chain, &block, blobs) - .map_err(|e| { - error!( - log, - "Invalid data column - not publishing block"; - "error" => ?e, - "slot" => slot - ); - warp_utils::reject::custom_bad_request(format!("{e:?}")) - })?; - (vec![], data_column_sidecars) - } - None => (vec![], vec![]), - }; + let build_sidecar_task_handle = + spawn_build_data_sidecar_task(chain.clone(), block.clone(), unverified_blobs, log.clone())?; // Gossip verify the block and blobs/data columns separately. let gossip_verified_block_result = unverified_block.into_gossip_verified_block(&chain); - let gossip_verified_blobs = blob_sidecars - .into_iter() - .map(|blob_sidecar| { - let gossip_verified_blob = - GossipVerifiedBlob::new(blob_sidecar.clone(), blob_sidecar.index, &chain); - - match gossip_verified_blob { - Ok(blob) => Ok(Some(blob)), - Err(GossipBlobError::RepeatBlob { proposer, .. }) => { - // Log the error but do not abort publication, we may need to publish the block - // or some of the other blobs if the block & blobs are only partially published - // by the other publisher. - debug!( - log, - "Blob for publication already known"; - "blob_index" => blob_sidecar.index, - "slot" => slot, - "proposer" => proposer, - ); - Ok(None) - } - Err(e) => { - error!( - log, - "Blob for publication is gossip-invalid"; - "blob_index" => blob_sidecar.index, - "slot" => slot, - "error" => ?e, - ); - Err(warp_utils::reject::custom_bad_request(e.to_string())) - } - } - }) - .collect::, Rejection>>()?; - - let gossip_verified_data_columns = data_column_sidecars - .into_iter() - .map(|data_column_sidecar| { - let column_index = data_column_sidecar.index as usize; - let subnet = - DataColumnSubnetId::from_column_index::(column_index, &chain.spec); - let gossip_verified_column = - GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), &chain); - - match gossip_verified_column { - Ok(blob) => Ok(Some(blob)), - Err(GossipDataColumnError::PriorKnown { proposer, .. }) => { - // Log the error but do not abort publication, we may need to publish the block - // or some of the other data columns if the block & data columns are only - // partially published by the other publisher. - debug!( - log, - "Data column for publication already known"; - "column_index" => column_index, - "slot" => slot, - "proposer" => proposer, - ); - Ok(None) - } - Err(e) => { - error!( - log, - "Data column for publication is gossip-invalid"; - "column_index" => column_index, - "slot" => slot, - "error" => ?e, - ); - Err(warp_utils::reject::custom_bad_request(format!("{e:?}"))) - } - } - }) - .collect::, Rejection>>()?; - - let publishable_blobs = gossip_verified_blobs - .iter() - .flatten() - .map(|b| b.clone_blob()) - .collect::>(); - - let publishable_data_columns = gossip_verified_data_columns - .iter() - .flatten() - .map(|b| b.clone_data_column()) - .collect::>(); - let block_root = block_root.unwrap_or_else(|| { gossip_verified_block_result.as_ref().map_or_else( |_| block.canonical_root(), @@ -321,12 +146,9 @@ pub async fn publish_block>( }); let should_publish_block = gossip_verified_block_result.is_ok(); - if let BroadcastValidation::Gossip = validation_level { + if BroadcastValidation::Gossip == validation_level && should_publish_block { publish_block_p2p( block.clone(), - should_publish_block, - publishable_blobs.clone(), - publishable_data_columns.clone(), sender_clone.clone(), log.clone(), seen_timestamp, @@ -337,38 +159,39 @@ pub async fn publish_block>( let publish_fn_completed = Arc::new(AtomicBool::new(false)); let block_to_publish = block.clone(); let publish_fn = || { - match validation_level { - BroadcastValidation::Gossip => (), - BroadcastValidation::Consensus => publish_block_p2p( - block_to_publish.clone(), - should_publish_block, - publishable_blobs.clone(), - publishable_data_columns.clone(), - sender_clone.clone(), - log.clone(), - seen_timestamp, - )?, - BroadcastValidation::ConsensusAndEquivocation => { - check_slashable(&chain, block_root, &block_to_publish, &log)?; - publish_block_p2p( + if should_publish_block { + match validation_level { + BroadcastValidation::Gossip => (), + BroadcastValidation::Consensus => publish_block_p2p( block_to_publish.clone(), - should_publish_block, - publishable_blobs.clone(), - publishable_data_columns.clone(), sender_clone.clone(), log.clone(), seen_timestamp, - )?; - } - }; + )?, + BroadcastValidation::ConsensusAndEquivocation => { + check_slashable(&chain, block_root, &block_to_publish, &log)?; + publish_block_p2p( + block_to_publish.clone(), + sender_clone.clone(), + log.clone(), + seen_timestamp, + )?; + } + }; + } + publish_fn_completed.store(true, Ordering::SeqCst); Ok(()) }; + // Wait for blobs/columns to get gossip verified before proceeding further as we need them for import. + let (gossip_verified_blobs, gossip_verified_columns) = build_sidecar_task_handle.await?; + for blob in gossip_verified_blobs.into_iter().flatten() { - // Importing the blobs could trigger block import and network publication in the case - // where the block was already seen on gossip. - if let Err(e) = Box::pin(chain.process_gossip_blob(blob, &publish_fn)).await { + publish_blob_sidecars(network_tx, &blob).map_err(|_| { + warp_utils::reject::custom_server_error("unable to publish blob sidecars".into()) + })?; + if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await { let msg = format!("Invalid blob: {e}"); return if let BroadcastValidation::Gossip = validation_level { Err(warp_utils::reject::broadcast_without_import(msg)) @@ -383,14 +206,12 @@ pub async fn publish_block>( } } - if gossip_verified_data_columns - .iter() - .map(Option::is_some) - .count() - > 0 - { + if gossip_verified_columns.iter().map(Option::is_some).count() > 0 { + publish_column_sidecars(network_tx, &gossip_verified_columns, &chain).map_err(|_| { + warp_utils::reject::custom_server_error("unable to publish data column sidecars".into()) + })?; let sampling_columns_indices = &network_globals.sampling_columns; - let sampling_columns = gossip_verified_data_columns + let sampling_columns = gossip_verified_columns .into_iter() .flatten() .filter(|data_column| sampling_columns_indices.contains(&data_column.index())) @@ -501,6 +322,224 @@ pub async fn publish_block>( } } +type BuildDataSidecarTaskResult = Result< + ( + Vec>>, + Vec>>, + ), + Rejection, +>; + +/// Convert blobs to either: +/// +/// 1. Blob sidecars if prior to peer DAS, or +/// 2. Data column sidecars if post peer DAS. +fn spawn_build_data_sidecar_task( + chain: Arc>, + block: Arc>>, + proofs_and_blobs: UnverifiedBlobs, + log: Logger, +) -> Result>, Rejection> { + chain + .clone() + .task_executor + .spawn_blocking_handle( + move || { + let Some((kzg_proofs, blobs)) = proofs_and_blobs else { + return Ok((vec![], vec![])); + }; + + let peer_das_enabled = chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); + if !peer_das_enabled { + // Pre-PeerDAS: construct blob sidecars for the network. + let gossip_verified_blobs = + build_gossip_verified_blobs(&chain, &block, blobs, kzg_proofs, &log)?; + Ok((gossip_verified_blobs, vec![])) + } else { + // Post PeerDAS: construct data columns. + let gossip_verified_data_columns = + build_gossip_verified_data_columns(&chain, &block, blobs, &log)?; + Ok((vec![], gossip_verified_data_columns)) + } + }, + "build_data_sidecars", + ) + .ok_or(warp_utils::reject::custom_server_error( + "runtime shutdown".to_string(), + )) + .map(|r| { + r.map_err(|_| warp_utils::reject::custom_server_error("join error".to_string())) + .and_then(|output| async move { output }) + }) +} + +fn build_gossip_verified_data_columns( + chain: &BeaconChain, + block: &SignedBeaconBlock>, + blobs: BlobsList, + log: &Logger, +) -> Result>>, Rejection> { + let slot = block.slot(); + let data_column_sidecars = + build_blob_data_column_sidecars(chain, block, blobs).map_err(|e| { + error!( + log, + "Invalid data column - not publishing block"; + "error" => ?e, + "slot" => slot + ); + warp_utils::reject::custom_bad_request(format!("{e:?}")) + })?; + + let slot = block.slot(); + let gossip_verified_data_columns = data_column_sidecars + .into_iter() + .map(|data_column_sidecar| { + let column_index = data_column_sidecar.index as usize; + let subnet = + DataColumnSubnetId::from_column_index::(column_index, &chain.spec); + let gossip_verified_column = + GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), chain); + + match gossip_verified_column { + Ok(blob) => Ok(Some(blob)), + Err(GossipDataColumnError::PriorKnown { proposer, .. }) => { + // Log the error but do not abort publication, we may need to publish the block + // or some of the other data columns if the block & data columns are only + // partially published by the other publisher. + debug!( + log, + "Data column for publication already known"; + "column_index" => column_index, + "slot" => slot, + "proposer" => proposer, + ); + Ok(None) + } + Err(e) => { + error!( + log, + "Data column for publication is gossip-invalid"; + "column_index" => column_index, + "slot" => slot, + "error" => ?e, + ); + Err(warp_utils::reject::custom_bad_request(format!("{e:?}"))) + } + } + }) + .collect::, Rejection>>()?; + + Ok(gossip_verified_data_columns) +} + +fn build_gossip_verified_blobs( + chain: &BeaconChain, + block: &SignedBeaconBlock>, + blobs: BlobsList, + kzg_proofs: KzgProofs, + log: &Logger, +) -> Result>>, Rejection> { + let slot = block.slot(); + let gossip_verified_blobs = kzg_proofs + .into_iter() + .zip(blobs) + .enumerate() + .map(|(i, (proof, unverified_blob))| { + let timer = metrics::start_timer( + &beacon_chain::metrics::BLOB_SIDECAR_INCLUSION_PROOF_COMPUTATION, + ); + let blob_sidecar = BlobSidecar::new(i, unverified_blob, block, proof) + .map(Arc::new) + .map_err(|e| { + error!( + log, + "Invalid blob - not publishing block"; + "error" => ?e, + "blob_index" => i, + "slot" => slot, + ); + warp_utils::reject::custom_bad_request(format!("{e:?}")) + })?; + drop(timer); + + let gossip_verified_blob = + GossipVerifiedBlob::new(blob_sidecar.clone(), blob_sidecar.index, chain); + + match gossip_verified_blob { + Ok(blob) => Ok(Some(blob)), + Err(GossipBlobError::RepeatBlob { proposer, .. }) => { + // Log the error but do not abort publication, we may need to publish the block + // or some of the other blobs if the block & blobs are only partially published + // by the other publisher. + debug!( + log, + "Blob for publication already known"; + "blob_index" => blob_sidecar.index, + "slot" => slot, + "proposer" => proposer, + ); + Ok(None) + } + Err(e) => { + error!( + log, + "Blob for publication is gossip-invalid"; + "blob_index" => blob_sidecar.index, + "slot" => slot, + "error" => ?e, + ); + Err(warp_utils::reject::custom_bad_request(e.to_string())) + } + } + }) + .collect::, Rejection>>()?; + + Ok(gossip_verified_blobs) +} + +fn publish_column_sidecars( + sender_clone: &UnboundedSender>, + data_column_sidecars: &[Option>], + chain: &BeaconChain, +) -> Result<(), BlockError> { + let malicious_withhold_count = chain.config.malicious_withhold_count; + let mut data_column_sidecars = data_column_sidecars + .iter() + .flatten() + .map(|d| d.clone_data_column()) + .collect::>(); + if malicious_withhold_count > 0 { + let columns_to_keep = data_column_sidecars + .len() + .saturating_sub(malicious_withhold_count); + // Randomize columns before dropping the last malicious_withhold_count items + data_column_sidecars.shuffle(&mut rand::thread_rng()); + data_column_sidecars.truncate(columns_to_keep); + } + let pubsub_messages = data_column_sidecars + .into_iter() + .map(|data_col| { + let subnet = DataColumnSubnetId::from_column_index::( + data_col.index as usize, + &chain.spec, + ); + PubsubMessage::DataColumnSidecar(Box::new((subnet, data_col))) + }) + .collect::>(); + crate::publish_pubsub_messages(sender_clone, pubsub_messages) + .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) +} + +fn publish_blob_sidecars( + sender_clone: &UnboundedSender>, + blob: &GossipVerifiedBlob, +) -> Result<(), BlockError> { + let pubsub_message = PubsubMessage::BlobSidecar(Box::new((blob.index(), blob.clone_blob()))); + crate::publish_pubsub_message(sender_clone, pubsub_message) + .map_err(|_| BlockError::BeaconChainError(BeaconChainError::UnableToPublish)) +} + async fn post_block_import_logging_and_response( result: Result, validation_level: BroadcastValidation, diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index f55983ec66a..1338f4f1802 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -1486,7 +1486,7 @@ pub async fn block_seen_on_gossip_with_some_blobs() { tester .harness .chain - .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .process_gossip_blob(gossip_blob) .await .unwrap(); } @@ -1559,7 +1559,7 @@ pub async fn blobs_seen_on_gossip_without_block() { tester .harness .chain - .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .process_gossip_blob(gossip_blob) .await .unwrap(); } @@ -1633,7 +1633,7 @@ pub async fn blobs_seen_on_gossip_without_block_and_no_http_blobs() { tester .harness .chain - .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .process_gossip_blob(gossip_blob) .await .unwrap(); } @@ -1705,7 +1705,7 @@ pub async fn slashable_blobs_seen_on_gossip_cause_failure() { tester .harness .chain - .process_gossip_blob(gossip_blob, || panic!("should not publish block yet")) + .process_gossip_blob(gossip_blob) .await .unwrap(); } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 4d875cb4a14..aa7194bb056 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -914,10 +914,7 @@ impl NetworkBeaconProcessor { let blob_slot = verified_blob.slot(); let blob_index = verified_blob.id().index; - let result = self - .chain - .process_gossip_blob(verified_blob, || Ok(())) - .await; + let result = self.chain.process_gossip_blob(verified_blob).await; match &result { Ok(AvailabilityProcessingStatus::Imported(block_root)) => { @@ -925,7 +922,7 @@ impl NetworkBeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); info!( self.log, - "Gossipsub blob processed, imported fully available block"; + "Gossipsub blob processed - imported fully available block"; "block_root" => %block_root ); self.chain.recompute_head_at_current_slot().await; @@ -936,9 +933,9 @@ impl NetworkBeaconProcessor { ); } Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { - trace!( + debug!( self.log, - "Processed blob, waiting for other components"; + "Processed gossip blob - waiting for other components"; "slot" => %slot, "blob_index" => %blob_index, "block_root" => %block_root, @@ -1079,7 +1076,7 @@ impl NetworkBeaconProcessor { message_id, peer_id, peer_client, - block, + block.clone(), reprocess_tx.clone(), seen_duration, ) @@ -1497,6 +1494,12 @@ impl NetworkBeaconProcessor { "slot" => slot, "block_root" => %block_root, ); + + // Block is valid, we can now attempt fetching blobs from EL using version hashes + // derived from kzg commitments from the block, without having to wait for all blobs + // to be sent from the peers if we already have them. + self.fetch_engine_blobs_and_publish(block.clone(), *block_root) + .await; } Err(BlockError::ParentUnknown { .. }) => { // This should not occur. It should be checked by `should_forward_block`. diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 76f5e886ff2..a671fd4bdb0 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -1,11 +1,17 @@ use crate::sync::manager::BlockProcessType; use crate::sync::SamplingId; use crate::{service::NetworkMessage, sync::manager::SyncMessage}; +use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError}; +use beacon_chain::fetch_blobs::{ + fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError, +}; +use beacon_chain::observed_data_sidecars::DoNotObserve; use beacon_chain::{ builder::Witness, eth1_chain::CachingEth1Backend, AvailabilityProcessingStatus, BeaconChain, + BeaconChainTypes, BlockError, NotifyExecutionLayer, }; -use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorChannels, BeaconProcessorSend, DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, @@ -21,7 +27,8 @@ use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PubsubMessage, }; -use slog::{debug, error, trace, Logger}; +use rand::prelude::SliceRandom; +use slog::{debug, error, trace, warn, Logger}; use slot_clock::ManualSlotClock; use std::path::PathBuf; use std::sync::Arc; @@ -67,6 +74,9 @@ pub struct NetworkBeaconProcessor { pub log: Logger, } +// Publish blobs in batches of exponentially increasing size. +const BLOB_PUBLICATION_EXP_FACTOR: usize = 2; + impl NetworkBeaconProcessor { fn try_send(&self, event: BeaconWorkEvent) -> Result<(), Error> { self.beacon_processor_send @@ -878,6 +888,76 @@ impl NetworkBeaconProcessor { }); } + pub async fn fetch_engine_blobs_and_publish( + self: &Arc, + block: Arc>>, + block_root: Hash256, + ) { + let self_cloned = self.clone(); + let publish_fn = move |blobs_or_data_column| { + match blobs_or_data_column { + BlobsOrDataColumns::Blobs(blobs) => { + self_cloned.publish_blobs_gradually(blobs, block_root); + } + BlobsOrDataColumns::DataColumns(columns) => { + self_cloned.publish_data_columns_gradually(columns, block_root); + } + }; + }; + + match fetch_and_process_engine_blobs( + self.chain.clone(), + block_root, + block.clone(), + publish_fn, + ) + .await + { + Ok(Some(availability)) => match availability { + AvailabilityProcessingStatus::Imported(_) => { + debug!( + self.log, + "Block components retrieved from EL"; + "result" => "imported block and custody columns", + "block_root" => %block_root, + ); + self.chain.recompute_head_at_current_slot().await; + } + AvailabilityProcessingStatus::MissingComponents(_, _) => { + debug!( + self.log, + "Still missing blobs after engine blobs processed successfully"; + "block_root" => %block_root, + ); + } + }, + Ok(None) => { + debug!( + self.log, + "Fetch blobs completed without import"; + "block_root" => %block_root, + ); + } + Err(FetchEngineBlobError::BlobProcessingError(BlockError::DuplicateFullyImported( + .., + ))) => { + debug!( + self.log, + "Fetch blobs duplicate import"; + "block_root" => %block_root, + ); + } + Err(e) => { + error!( + self.log, + "Error fetching or processing blobs from EL"; + "error" => ?e, + "block_root" => %block_root, + ); + } + } + } + /// Attempt to reconstruct all data columns if the following conditions satisfies: /// - Our custody requirement is all columns /// - We have >= 50% of columns, but not all columns @@ -885,25 +965,13 @@ impl NetworkBeaconProcessor { /// Returns `Some(AvailabilityProcessingStatus)` if reconstruction is successfully performed, /// otherwise returns `None`. async fn attempt_data_column_reconstruction( - &self, + self: &Arc, block_root: Hash256, ) -> Option { let result = self.chain.reconstruct_data_columns(block_root).await; match result { Ok(Some((availability_processing_status, data_columns_to_publish))) => { - self.send_network_message(NetworkMessage::Publish { - messages: data_columns_to_publish - .iter() - .map(|d| { - let subnet = DataColumnSubnetId::from_column_index::( - d.index as usize, - &self.chain.spec, - ); - PubsubMessage::DataColumnSidecar(Box::new((subnet, d.clone()))) - }) - .collect(), - }); - + self.publish_data_columns_gradually(data_columns_to_publish, block_root); match &availability_processing_status { AvailabilityProcessingStatus::Imported(hash) => { debug!( @@ -946,6 +1014,175 @@ impl NetworkBeaconProcessor { } } } + + /// This function gradually publishes blobs to the network in randomised batches. + /// + /// This is an optimisation to reduce outbound bandwidth and ensures each blob is published + /// by some nodes on the network as soon as possible. Our hope is that some blobs arrive from + /// other nodes in the meantime, obviating the need for us to publish them. If no other + /// publisher exists for a blob, it will eventually get published here. + fn publish_blobs_gradually( + self: &Arc, + mut blobs: Vec>, + block_root: Hash256, + ) { + let self_clone = self.clone(); + + self.executor.spawn( + async move { + let chain = self_clone.chain.clone(); + let log = self_clone.chain.logger(); + let publish_fn = |blobs: Vec>>| { + self_clone.send_network_message(NetworkMessage::Publish { + messages: blobs + .into_iter() + .map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob)))) + .collect(), + }); + }; + + // Permute the blobs and split them into batches. + // The hope is that we won't need to publish some blobs because we will receive them + // on gossip from other nodes. + blobs.shuffle(&mut rand::thread_rng()); + + let blob_publication_batch_interval = chain.config.blob_publication_batch_interval; + let mut publish_count = 0usize; + let blob_count = blobs.len(); + let mut blobs_iter = blobs.into_iter().peekable(); + let mut batch_size = 1usize; + + while blobs_iter.peek().is_some() { + let batch = blobs_iter.by_ref().take(batch_size); + let publishable = batch + .filter_map(|unobserved| match unobserved.observe(&chain) { + Ok(observed) => Some(observed.clone_blob()), + Err(GossipBlobError::RepeatBlob { .. }) => None, + Err(e) => { + warn!( + log, + "Previously verified blob is invalid"; + "error" => ?e + ); + None + } + }) + .collect::>(); + + if !publishable.is_empty() { + debug!( + log, + "Publishing blob batch"; + "publish_count" => publishable.len(), + "block_root" => ?block_root, + ); + publish_count += publishable.len(); + publish_fn(publishable); + } + + tokio::time::sleep(blob_publication_batch_interval).await; + batch_size *= BLOB_PUBLICATION_EXP_FACTOR; + } + + debug!( + log, + "Batch blob publication complete"; + "batch_interval" => blob_publication_batch_interval.as_millis(), + "blob_count" => blob_count, + "published_count" => publish_count, + "block_root" => ?block_root, + ) + }, + "gradual_blob_publication", + ); + } + + /// This function gradually publishes data columns to the network in randomised batches. + /// + /// This is an optimisation to reduce outbound bandwidth and ensures each column is published + /// by some nodes on the network as soon as possible. Our hope is that some columns arrive from + /// other supernodes in the meantime, obviating the need for us to publish them. If no other + /// publisher exists for a column, it will eventually get published here. + fn publish_data_columns_gradually( + self: &Arc, + mut data_columns_to_publish: DataColumnSidecarList, + block_root: Hash256, + ) { + let self_clone = self.clone(); + + self.executor.spawn( + async move { + let chain = self_clone.chain.clone(); + let log = self_clone.chain.logger(); + let publish_fn = |columns: DataColumnSidecarList| { + self_clone.send_network_message(NetworkMessage::Publish { + messages: columns + .into_iter() + .map(|d| { + let subnet = DataColumnSubnetId::from_column_index::( + d.index as usize, + &chain.spec, + ); + PubsubMessage::DataColumnSidecar(Box::new((subnet, d))) + }) + .collect(), + }); + }; + + // If this node is a super node, permute the columns and split them into batches. + // The hope is that we won't need to publish some columns because we will receive them + // on gossip from other supernodes. + data_columns_to_publish.shuffle(&mut rand::thread_rng()); + + let blob_publication_batch_interval = chain.config.blob_publication_batch_interval; + let blob_publication_batches = chain.config.blob_publication_batches; + let batch_size = chain.spec.number_of_columns / blob_publication_batches; + let mut publish_count = 0usize; + + for batch in data_columns_to_publish.chunks(batch_size) { + let publishable = batch + .iter() + .filter_map(|col| match observe_gossip_data_column(col, &chain) { + Ok(()) => Some(col.clone()), + Err(GossipDataColumnError::PriorKnown { .. }) => None, + Err(e) => { + warn!( + log, + "Previously verified data column is invalid"; + "error" => ?e + ); + None + } + }) + .collect::>(); + + if !publishable.is_empty() { + debug!( + log, + "Publishing data column batch"; + "publish_count" => publishable.len(), + "block_root" => ?block_root, + ); + publish_count += publishable.len(); + publish_fn(publishable); + } + + tokio::time::sleep(blob_publication_batch_interval).await; + } + + debug!( + log, + "Batch data column publishing complete"; + "batch_size" => batch_size, + "batch_interval" => blob_publication_batch_interval.as_millis(), + "data_columns_to_publish_count" => data_columns_to_publish.len(), + "published_count" => publish_count, + "block_root" => ?block_root, + ) + }, + "gradual_data_column_publication", + ); + } } type TestBeaconChainType = diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 82d06c20f8e..164dbc71154 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -153,6 +153,7 @@ impl NetworkBeaconProcessor { "process_type" => ?process_type, ); + let signed_beacon_block = block.block_cloned(); let result = self .chain .process_block_with_early_caching( @@ -166,26 +167,36 @@ impl NetworkBeaconProcessor { metrics::inc_counter(&metrics::BEACON_PROCESSOR_RPC_BLOCK_IMPORTED_TOTAL); // RPC block imported, regardless of process type - if let &Ok(AvailabilityProcessingStatus::Imported(hash)) = &result { - info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); + match result.as_ref() { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + info!(self.log, "New RPC block received"; "slot" => slot, "hash" => %hash); - // Trigger processing for work referencing this block. - let reprocess_msg = ReprocessQueueMessage::BlockImported { - block_root: hash, - parent_root, - }; - if reprocess_tx.try_send(reprocess_msg).is_err() { - error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) - }; - self.chain.block_times_cache.write().set_time_observed( - hash, - slot, - seen_timestamp, - None, - None, - ); + // Trigger processing for work referencing this block. + let reprocess_msg = ReprocessQueueMessage::BlockImported { + block_root: *hash, + parent_root, + }; + if reprocess_tx.try_send(reprocess_msg).is_err() { + error!(self.log, "Failed to inform block import"; "source" => "rpc", "block_root" => %hash) + }; + self.chain.block_times_cache.write().set_time_observed( + *hash, + slot, + seen_timestamp, + None, + None, + ); - self.chain.recompute_head_at_current_slot().await; + self.chain.recompute_head_at_current_slot().await; + } + Ok(AvailabilityProcessingStatus::MissingComponents(..)) => { + // Block is valid, we can now attempt fetching blobs from EL using version hashes + // derived from kzg commitments from the block, without having to wait for all blobs + // to be sent from the peers if we already have them. + self.fetch_engine_blobs_and_publish(signed_beacon_block, block_root) + .await + } + _ => {} } // RPC block imported or execution validated. If the block was already imported by gossip we diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index dff030fb0f1..c5427e0ce02 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -86,6 +86,24 @@ pub fn cli_app() -> Command { .hide(true) .display_order(0) ) + .arg( + Arg::new("blob-publication-batches") + .long("blob-publication-batches") + .action(ArgAction::Set) + .help_heading(FLAG_HEADER) + .help("Number of batches that the node splits blobs or data columns into during publication. This doesn't apply if the node is the block proposer. Used in PeerDAS only.") + .display_order(0) + .hide(true) + ) + .arg( + Arg::new("blob-publication-batch-interval") + .long("blob-publication-batch-interval") + .action(ArgAction::Set) + .help_heading(FLAG_HEADER) + .help("The delay in milliseconds applied by the node between sending each blob or data column batch. This doesn't apply if the node is the block proposer.") + .display_order(0) + .hide(true) + ) .arg( Arg::new("subscribe-all-subnets") .long("subscribe-all-subnets") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 2d318153515..0f17c368005 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -192,6 +192,15 @@ pub fn get_config( client_config.chain.enable_sampling = true; } + if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? { + client_config.chain.blob_publication_batches = batches; + } + + if let Some(interval) = clap_utils::parse_optional(cli_args, "blob-publication-batch-interval")? + { + client_config.chain.blob_publication_batch_interval = Duration::from_millis(interval); + } + /* * Prometheus metrics HTTP server */ diff --git a/consensus/types/src/beacon_block_body.rs b/consensus/types/src/beacon_block_body.rs index c81e7bcde93..b970980c861 100644 --- a/consensus/types/src/beacon_block_body.rs +++ b/consensus/types/src/beacon_block_body.rs @@ -178,57 +178,71 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> BeaconBlockBodyRef<'a, E, leaves } - /// Produces the proof of inclusion for a `KzgCommitment` in `self.blob_kzg_commitments` - /// at `index`. + /// Calculate a KZG commitment merkle proof. + /// + /// Prefer to use `complete_kzg_commitment_merkle_proof` with a reused proof for the + /// `blob_kzg_commitments` field. pub fn kzg_commitment_merkle_proof( &self, index: usize, ) -> Result, Error> { - // We compute the branches by generating 2 merkle trees: - // 1. Merkle tree for the `blob_kzg_commitments` List object - // 2. Merkle tree for the `BeaconBlockBody` container - // We then merge the branches for both the trees all the way up to the root. - - // Part1 (Branches for the subtree rooted at `blob_kzg_commitments`) - // - // Branches for `blob_kzg_commitments` without length mix-in - let blob_leaves = self - .blob_kzg_commitments()? - .iter() - .map(|commitment| commitment.tree_hash_root()) - .collect::>(); - let depth = E::max_blob_commitments_per_block() - .next_power_of_two() - .ilog2(); - let tree = MerkleTree::create(&blob_leaves, depth as usize); - let (_, mut proof) = tree - .generate_proof(index, depth as usize) - .map_err(Error::MerkleTreeError)?; - - // Add the branch corresponding to the length mix-in. - let length = blob_leaves.len(); - let usize_len = std::mem::size_of::(); - let mut length_bytes = [0; BYTES_PER_CHUNK]; - length_bytes - .get_mut(0..usize_len) - .ok_or(Error::MerkleTreeError(MerkleTreeError::PleaseNotifyTheDevs))? - .copy_from_slice(&length.to_le_bytes()); - let length_root = Hash256::from_slice(length_bytes.as_slice()); - proof.push(length_root); - - // Part 2 - // Branches for `BeaconBlockBody` container - let body_leaves = self.body_merkle_leaves(); - let beacon_block_body_depth = body_leaves.len().next_power_of_two().ilog2() as usize; - let tree = MerkleTree::create(&body_leaves, beacon_block_body_depth); - let (_, mut proof_body) = tree - .generate_proof(BLOB_KZG_COMMITMENTS_INDEX, beacon_block_body_depth) - .map_err(Error::MerkleTreeError)?; - // Join the proofs for the subtree and the main tree - proof.append(&mut proof_body); - debug_assert_eq!(proof.len(), E::kzg_proof_inclusion_proof_depth()); + let kzg_commitments_proof = self.kzg_commitments_merkle_proof()?; + let proof = self.complete_kzg_commitment_merkle_proof(index, &kzg_commitments_proof)?; + Ok(proof) + } - Ok(proof.into()) + /// Produces the proof of inclusion for a `KzgCommitment` in `self.blob_kzg_commitments` + /// at `index` using an existing proof for the `blob_kzg_commitments` field. + pub fn complete_kzg_commitment_merkle_proof( + &self, + index: usize, + kzg_commitments_proof: &[Hash256], + ) -> Result, Error> { + match self { + Self::Base(_) | Self::Altair(_) | Self::Bellatrix(_) | Self::Capella(_) => { + Err(Error::IncorrectStateVariant) + } + Self::Deneb(_) | Self::Electra(_) => { + // We compute the branches by generating 2 merkle trees: + // 1. Merkle tree for the `blob_kzg_commitments` List object + // 2. Merkle tree for the `BeaconBlockBody` container + // We then merge the branches for both the trees all the way up to the root. + + // Part1 (Branches for the subtree rooted at `blob_kzg_commitments`) + // + // Branches for `blob_kzg_commitments` without length mix-in + let blob_leaves = self + .blob_kzg_commitments()? + .iter() + .map(|commitment| commitment.tree_hash_root()) + .collect::>(); + let depth = E::max_blob_commitments_per_block() + .next_power_of_two() + .ilog2(); + let tree = MerkleTree::create(&blob_leaves, depth as usize); + let (_, mut proof) = tree + .generate_proof(index, depth as usize) + .map_err(Error::MerkleTreeError)?; + + // Add the branch corresponding to the length mix-in. + let length = blob_leaves.len(); + let usize_len = std::mem::size_of::(); + let mut length_bytes = [0; BYTES_PER_CHUNK]; + length_bytes + .get_mut(0..usize_len) + .ok_or(Error::MerkleTreeError(MerkleTreeError::PleaseNotifyTheDevs))? + .copy_from_slice(&length.to_le_bytes()); + let length_root = Hash256::from_slice(length_bytes.as_slice()); + proof.push(length_root); + + // Part 2 + // Branches for `BeaconBlockBody` container + // Join the proofs for the subtree and the main tree + proof.extend_from_slice(kzg_commitments_proof); + + Ok(FixedVector::new(proof)?) + } + } } /// Produces the proof of inclusion for `self.blob_kzg_commitments`. @@ -241,7 +255,7 @@ impl<'a, E: EthSpec, Payload: AbstractExecPayload> BeaconBlockBodyRef<'a, E, let (_, proof) = tree .generate_proof(BLOB_KZG_COMMITMENTS_INDEX, beacon_block_body_depth) .map_err(Error::MerkleTreeError)?; - Ok(proof.into()) + Ok(FixedVector::new(proof)?) } pub fn block_body_merkle_proof(&self, generalized_index: usize) -> Result, Error> { diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 0f7dbb2673c..5a330388cce 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -150,6 +150,37 @@ impl BlobSidecar { }) } + pub fn new_with_existing_proof( + index: usize, + blob: Blob, + signed_block: &SignedBeaconBlock, + signed_block_header: SignedBeaconBlockHeader, + kzg_commitments_inclusion_proof: &[Hash256], + kzg_proof: KzgProof, + ) -> Result { + let expected_kzg_commitments = signed_block + .message() + .body() + .blob_kzg_commitments() + .map_err(|_e| BlobSidecarError::PreDeneb)?; + let kzg_commitment = *expected_kzg_commitments + .get(index) + .ok_or(BlobSidecarError::MissingKzgCommitment)?; + let kzg_commitment_inclusion_proof = signed_block + .message() + .body() + .complete_kzg_commitment_merkle_proof(index, kzg_commitments_inclusion_proof)?; + + Ok(Self { + index: index as u64, + blob, + kzg_commitment, + kzg_proof, + signed_block_header, + kzg_commitment_inclusion_proof, + }) + } + pub fn id(&self) -> BlobIdentifier { BlobIdentifier { block_root: self.block_root(), diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index b52adcfe412..c1f7a49e404 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -239,6 +239,37 @@ impl> SignedBeaconBlock } } + /// Produce a signed beacon block header AND a merkle proof for the KZG commitments. + /// + /// This method is more efficient than generating each part separately as it reuses hashing. + pub fn signed_block_header_and_kzg_commitments_proof( + &self, + ) -> Result< + ( + SignedBeaconBlockHeader, + FixedVector, + ), + Error, + > { + let proof = self.message().body().kzg_commitments_merkle_proof()?; + let body_root = *proof.last().unwrap(); + + let block_header = BeaconBlockHeader { + slot: self.slot(), + proposer_index: self.message().proposer_index(), + parent_root: self.parent_root(), + state_root: self.state_root(), + body_root, + }; + + let signed_header = SignedBeaconBlockHeader { + message: block_header, + signature: self.signature().clone(), + }; + + Ok((signed_header, proof)) + } + /// Convenience accessor for the block's slot. pub fn slot(&self) -> Slot { self.message().slot() diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index ac7ddcdbd98..83a4367252a 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -800,6 +800,27 @@ fn network_enable_sampling_flag() { .run_with_zero_port() .with_config(|config| assert!(config.chain.enable_sampling)); } +#[test] +fn blob_publication_batches() { + CommandLineTest::new() + .flag("blob-publication-batches", Some("3")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.chain.blob_publication_batches, 3)); +} + +#[test] +fn blob_publication_batch_interval() { + CommandLineTest::new() + .flag("blob-publication-batch-interval", Some("400")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.chain.blob_publication_batch_interval, + Duration::from_millis(400) + ) + }); +} + #[test] fn network_enable_sampling_flag_default() { CommandLineTest::new() diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 8d933a6fcd5..33ae132e8a2 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -505,8 +505,8 @@ impl Tester { } Err(_) => GossipVerifiedBlob::__assumed_valid(blob_sidecar), }; - let result = self - .block_on_dangerous(self.harness.chain.process_gossip_blob(blob, || Ok(())))?; + let result = + self.block_on_dangerous(self.harness.chain.process_gossip_blob(blob))?; if valid { assert!(result.is_ok()); }