Skip to content

Commit

Permalink
Refactor to de-duplicate gradual publication logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Oct 28, 2024
1 parent 6e44763 commit 1872ae5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 60 deletions.
42 changes: 2 additions & 40 deletions beacon_node/beacon_chain/src/fetch_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use execution_layer::json_structures::BlobAndProofV1;
use execution_layer::Error as ExecutionLayerError;
use itertools::Either;
use lighthouse_metrics::{inc_counter, inc_counter_by, TryExt};
use rand::prelude::SliceRandom;
use slog::{debug, error, o, Logger};
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
Expand Down Expand Up @@ -136,7 +135,6 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(

let data_columns_receiver = spawn_compute_and_publish_data_columns_task(
&chain,
block_root,
block.clone(),
fixed_blob_sidecar_list.clone(),
publish_fn,
Expand Down Expand Up @@ -194,7 +192,6 @@ pub async fn fetch_and_process_engine_blobs<T: BeaconChainTypes>(
/// while maintaining the invariant that block and data columns are persisted atomically.
fn spawn_compute_and_publish_data_columns_task<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: FixedBlobSidecarList<T::EthSpec>,
publish_fn: impl Fn(BlobsOrDataColumns<T::EthSpec>) + Send + 'static,
Expand Down Expand Up @@ -222,7 +219,7 @@ fn spawn_compute_and_publish_data_columns_task<T: BeaconChainTypes>(
.discard_timer_on_break(&mut timer);
drop(timer);

let mut all_data_columns = match data_columns_result {
let all_data_columns = match data_columns_result {
Ok(d) => d,
Err(e) => {
error!(
Expand All @@ -249,42 +246,7 @@ fn spawn_compute_and_publish_data_columns_task<T: BeaconChainTypes>(
return;
}

// To reduce bandwidth for supernodes: permute the columns to publish and
// publish them in batches. Our hope is that some columns arrive from
// other supernodes in the meantime, obviating the need for us to publish
// them. If no other publisher exists for a column, it will eventually get
// published here.
// FIXME(das): deduplicate this wrt to gossip/sync methods
all_data_columns.shuffle(&mut rand::thread_rng());

let supernode_data_column_publication_batches = chain_cloned
.config
.supernode_data_column_publication_batches;
let supernode_data_column_publication_batch_interval = chain_cloned
.config
.supernode_data_column_publication_batch_interval;

let batch_size = all_data_columns.len() / supernode_data_column_publication_batches;
for batch in all_data_columns.chunks(batch_size) {
let already_seen = chain_cloned
.data_availability_checker
.cached_data_column_indexes(&block_root)
.unwrap_or_default();
let publishable = batch
.iter()
.filter(|col| !already_seen.contains(&col.index()))
.cloned()
.collect::<Vec<_>>();
if !publishable.is_empty() {
debug!(
log,
"Publishing data columns from EL";
"count" => publishable.len()
);
publish_fn(BlobsOrDataColumns::DataColumns(publishable));
}
std::thread::sleep(supernode_data_column_publication_batch_interval);
}
publish_fn(BlobsOrDataColumns::DataColumns(all_data_columns));
},
"compute_and_publish_data_columns",
);
Expand Down
54 changes: 34 additions & 20 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,28 +880,35 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
});
}

fn publish_blobs_or_data_column(&self, blobs_or_data_column: BlobsOrDataColumns<T::EthSpec>) {
let messages = match blobs_or_data_column {
fn publish_blobs_or_data_column(
self: &Arc<Self>,
blobs_or_data_column: BlobsOrDataColumns<T::EthSpec>,
block_root: Hash256,
) {
match blobs_or_data_column {
BlobsOrDataColumns::Blobs(blobs) => {
debug!(self.log, "Publishing blobs from EL"; "count" => blobs.len());
blobs
debug!(
self.log,
"Publishing blobs from EL";
"count" => blobs.len(),
"block_root" => ?block_root,
);
let messages = blobs
.into_iter()
.map(|blob| PubsubMessage::BlobSidecar(Box::new((blob.index, blob))))
.collect()
.collect();
self.send_network_message(NetworkMessage::Publish { messages });
}
BlobsOrDataColumns::DataColumns(columns) => {
debug!(self.log, "Publishing data columns built from EL blobs"; "count" => columns.len());
columns.into_iter().map(|column| {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
column.index as usize,
&self.chain.spec,
);
PubsubMessage::DataColumnSidecar(Box::new((subnet, column)))
})
debug!(
self.log,
"Publishing data columns built from EL blobs";
"count" => columns.len(),
"block_root" => ?block_root,
);
self.publish_data_columns_gradually(columns, block_root);
}
.collect(),
};
self.send_network_message(NetworkMessage::Publish { messages })
}

pub async fn fetch_engine_blobs_and_publish(
Expand All @@ -911,7 +918,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) {
let self_cloned = self.clone();
let publish_fn = move |blobs_or_data_column| {
self_cloned.publish_blobs_or_data_column(blobs_or_data_column)
self_cloned.publish_blobs_or_data_column(blobs_or_data_column, block_root)
};

match fetch_and_process_engine_blobs(
Expand Down Expand Up @@ -971,7 +978,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let result = self.chain.reconstruct_data_columns(block_root).await;
match result {
Ok(Some((availability_processing_status, data_columns_to_publish))) => {
self.handle_data_columns_to_publish(data_columns_to_publish, block_root);
self.publish_data_columns_gradually(data_columns_to_publish, block_root);
match &availability_processing_status {
AvailabilityProcessingStatus::Imported(hash) => {
debug!(
Expand Down Expand Up @@ -1015,7 +1022,13 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

fn handle_data_columns_to_publish(
/// This function gradually publishes data columns to the network in randomised batches.
///
/// This is an optimisation to reduce outbound bandwidth and ensures each column is published
/// by some nodes on the network as soon as possible. Our hope is that some columns arrive from
/// other supernodes in the meantime, obviating the need for us to publish them. If no other
/// publisher exists for a column, it will eventually get published here.
fn publish_data_columns_gradually(
self: &Arc<Self>,
mut data_columns_to_publish: DataColumnSidecarList<T::EthSpec>,
block_root: Hash256,
Expand Down Expand Up @@ -1068,15 +1081,16 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
debug!(
self_clone.chain.logger(),
"Publishing data column batch";
"count" => publishable.len()
"count" => publishable.len(),
"block_root" => ?block_root,
);
publish_fn(publishable);
}

tokio::time::sleep(supernode_data_column_publication_batch_interval).await;
}
},
"handle_data_columns_publish",
"publish_data_columns_gradually",
);
}
}
Expand Down

0 comments on commit 1872ae5

Please sign in to comment.