Skip to content

Commit

Permalink
Avoid double-emitting SSE blob events. Add code comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Aug 20, 2024
1 parent 0ecb203 commit 01b91cb
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 25 deletions.
47 changes: 25 additions & 22 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3047,13 +3047,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlobNotRequired(blob.slot()));
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
event_handler.register(EventKind::BlobSidecar(SseBlobSidecar::from_blob_sidecar(
blob.as_blob(),
)));
}
}
self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));

let r = self.check_gossip_blob_availability_and_import(blob).await;
self.remove_notified(&block_root, r)
Expand Down Expand Up @@ -3116,15 +3110,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
event_handler.register(EventKind::BlobSidecar(
SseBlobSidecar::from_blob_sidecar(blob),
));
}
}
}
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));

let r = self
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
Expand All @@ -3149,20 +3135,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
.await;
self.remove_notified(&block_root, r)
}

fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc<Self>, block_root: &Hash256, blobs_iter: I)
where
I: Iterator<Item = &'a BlobSidecar<T::EthSpec>>,
{
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
let imported_blobs = self
.data_availability_checker
.imported_blob_indexes(block_root)
.unwrap_or_default();
let new_blobs = blobs_iter.filter(|b| !imported_blobs.contains(&b.index));

for blob in new_blobs {
event_handler.register(EventKind::BlobSidecar(
SseBlobSidecar::from_blob_sidecar(blob),
));
}
}
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
.await;
self.remove_notified(&block_root, r)
}

/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
Expand Down Expand Up @@ -3891,6 +3890,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let custody_columns_count = self.data_availability_checker.get_custody_columns_count();
// if the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
let data_columns = data_columns
.filter(|a| a.len() >= custody_columns_count)
.or_else(|| data_column_recv?.blocking_recv());
Expand Down
15 changes: 13 additions & 2 deletions beacon_node/beacon_chain/src/fetch_blobs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
//! This module implements an optimisation to fetch blobs via JSON-RPC from the EL.
//! If a blob has already been seen in the public mempool, then it is often unnecessary to wait for
//! it to arrive on P2P gossip. This PR uses a new JSON-RPC method (`engine_getBlobsV1`) which
//! allows the CL to load the blobs quickly from the EL's blob pool.
//!
//! Once the node fetches the blobs from EL, it then publishes the remaining blobs that hasn't seen
//! on P2P gossip to the network. From PeerDAS onwards, together with the increase in blob count,
//! broadcasting blobs requires a much higher bandwidth, and is only done by high capacity
//! supernodes.
use crate::observed_data_sidecars::ObservableDataSidecar;
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError};
use itertools::Either;
Expand All @@ -15,6 +24,8 @@ pub enum BlobsOrDataColumns<E: EthSpec> {
DataColumns(DataColumnSidecarVec<E>),
}

/// Fetches blobs from the EL mempool and processes them. It also broadcasts unseen blobs or
/// data columns (PeerDAS onwards) to the network, using the supplied `publish_fn`.
pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
Expand Down Expand Up @@ -81,7 +92,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
.enumerate()
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
{
match BlobSidecar::new_efficiently(
match BlobSidecar::new_with_existing_proof(
i,
blob_and_proof.blob,
&block,
Expand Down Expand Up @@ -202,7 +213,7 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
block.slot(),
block_root,
fixed_blob_sidecar_list.clone(),
Some(data_columns_receiver),
peer_das_enabled.then_some(data_columns_receiver),
)
.await
.map(|_| debug!(chain.log, "Blobs from EL - processed"))
Expand Down
2 changes: 1 addition & 1 deletion consensus/types/src/blob_sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<E: EthSpec> BlobSidecar<E> {
})
}

pub fn new_efficiently(
pub fn new_with_existing_proof(
index: usize,
blob: Blob<E>,
signed_block: &SignedBeaconBlock<E>,
Expand Down

0 comments on commit 01b91cb

Please sign in to comment.