diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 26e15f5ec79..7e0fb3b3fdd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3047,13 +3047,7 @@ impl BeaconChain { return Err(BlockError::BlobNotRequired(blob.slot())); } - if let Some(event_handler) = self.event_handler.as_ref() { - if event_handler.has_blob_sidecar_subscribers() { - event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar( - blob.as_blob(), - ))); - } - } + self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob())); let r = self.check_gossip_blob_availability_and_import(blob).await; self.remove_notified(&block_root, r) @@ -3116,15 +3110,7 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown(block_root)); } - if let Some(event_handler) = self.event_handler.as_ref() { - if event_handler.has_blob_sidecar_subscribers() { - for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) { - event_handler.register(EventKind::BlobSidecar( - SseBlobSidecar::from_blob_sidecar(blob), - )); - } - } - } + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); let r = self .check_rpc_blob_availability_and_import(slot, block_root, blobs) @@ -3149,20 +3135,33 @@ impl BeaconChain { return Err(BlockError::BlockIsAlreadyKnown(block_root)); } + self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref)); + + let r = self + .check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv) + .await; + self.remove_notified(&block_root, r) + } + + fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc, block_root: &Hash256, blobs_iter: I) + where + I: Iterator>, + { if let Some(event_handler) = self.event_handler.as_ref() { if event_handler.has_blob_sidecar_subscribers() { - for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) { + let imported_blobs = self + .data_availability_checker + .imported_blob_indexes(block_root) + .unwrap_or_default(); + let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index)); + + for blob in new_blobs { event_handler.register(EventKind::BlobSidecar( SseBlobSidecar::from_blob_sidecar(blob), )); } } } - - let r = self - .check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv) - .await; - self.remove_notified(&block_root, r) } /// Cache the columns in the processing cache, process it, then evict it from the cache if it was @@ -3891,6 +3890,10 @@ impl BeaconChain { // See https://github.com/sigp/lighthouse/issues/2028 let (_, signed_block, blobs, data_columns) = signed_block.deconstruct(); let custody_columns_count = self.data_availability_checker.get_custody_columns_count(); + // if the block was made available via custody columns received from gossip / rpc, use them + // since we already have them. + // Otherwise, it means blobs were likely available via fetching from EL, in this case we + // wait for the data columns to be computed (blocking). let data_columns = data_columns .filter(|a| a.len() >= custody_columns_count) .or_else(|| data_column_recv?.blocking_recv()); diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index 706c03e31ab..2f9da5a4c80 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -1,3 +1,12 @@ +//! This module implements an optimisation to fetch blobs via JSON-RPC from the EL. +//! If a blob has already been seen in the public mempool, then it is often unnecessary to wait for +//! it to arrive on P2P gossip. This PR uses a new JSON-RPC method (`engine_getBlobsV1`) which +//! allows the CL to load the blobs quickly from the EL's blob pool. +//! +//! Once the node fetches the blobs from EL, it then publishes the remaining blobs that hasn't seen +//! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count, +//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity +//! supernodes. use crate::observed_data_sidecars::ObservableDataSidecar; use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError}; use itertools::Either; @@ -15,6 +24,8 @@ pub enum BlobsOrDataColumns { DataColumns(DataColumnSidecarVec), } +/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or +/// data columns (PeerDAS onwards) to the network, using the supplied `publish_fn`. pub async fn fetch_and_process_engine_blobs( chain: Arc>, block_root: Hash256, @@ -81,7 +92,7 @@ pub async fn fetch_and_process_engine_blobs( .enumerate() .filter_map(|(i, opt_blob)| Some((i, opt_blob?))) { - match BlobSidecar::new_efficiently( + match BlobSidecar::new_with_existing_proof( i, blob_and_proof.blob, &block, @@ -202,7 +213,7 @@ pub async fn fetch_and_process_engine_blobs( block.slot(), block_root, fixed_blob_sidecar_list.clone(), - Some(data_columns_receiver), + peer_das_enabled.then_some(data_columns_receiver), ) .await .map(|_| debug!(chain.log, "Blobs from EL - processed")) diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 85f6f55834f..759e10f6f2c 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -149,7 +149,7 @@ impl BlobSidecar { }) } - pub fn new_efficiently( + pub fn new_with_existing_proof( index: usize, blob: Blob, signed_block: &SignedBeaconBlock,