Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving blob propagation post-PeerDAS with Decentralized Blob Building #6268

Open
wants to merge 37 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
662d6cf
Get blobs from EL.
jimmygchen Aug 29, 2024
0c23848
Avoid cloning blobs after fetching blobs.
jimmygchen Aug 29, 2024
89dfaaa
Address review comments and refactor code.
jimmygchen Aug 29, 2024
401231b
Fix lint.
jimmygchen Aug 29, 2024
2efc99b
Move blob computation metric to the right spot.
jimmygchen Aug 29, 2024
db6318e
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Aug 30, 2024
aa79ec6
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Sep 5, 2024
36b23b2
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Sep 9, 2024
6bff4ab
Gradual publication of data columns for supernodes.
michaelsproul Sep 13, 2024
7977999
Recompute head after importing block with blobs from the EL.
jimmygchen Sep 13, 2024
5e75527
Fix lint
jimmygchen Sep 17, 2024
3444281
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Sep 17, 2024
e76d21f
Use blocking task instead of async when computing cells.
jimmygchen Sep 19, 2024
4b2956f
Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse in…
jimmygchen Sep 19, 2024
a6fbb3c
Merge remote-tracking branch 'origin/unstable' into das-fetch-blobs
michaelsproul Sep 27, 2024
4639146
Fix semantic conflicts
michaelsproul Sep 27, 2024
41c2e7d
Downgrade error log.
jimmygchen Oct 4, 2024
d8eedf3
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 17, 2024
a65030b
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 17, 2024
0a2c6f7
Publish block without waiting for blob and column proof computation.
jimmygchen Oct 18, 2024
0a9d5a0
Address review comments and refactor.
jimmygchen Oct 18, 2024
6511c93
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 18, 2024
7939888
Fix test and docs.
jimmygchen Oct 18, 2024
5ec9756
Comment cleanups.
jimmygchen Oct 18, 2024
422fc1b
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 22, 2024
bf19769
Address review comments and cleanup
jimmygchen Oct 22, 2024
6e44763
Address review comments and cleanup
jimmygchen Oct 22, 2024
1872ae5
Refactor to de-duplicate gradual publication logic.
jimmygchen Oct 28, 2024
81493f5
Add more logging.
jimmygchen Oct 28, 2024
4278e6a
Merge remote-tracking branch 'origin/unstable' into das-fetch-blobs
jimmygchen Oct 29, 2024
756230e
Fix incorrect comparison on `num_fetched_blobs`.
jimmygchen Oct 29, 2024
0f32b73
Implement gradual blob publication.
jimmygchen Oct 29, 2024
dfdfccb
Merge branch 'unstable' into das-fetch-blobs
jimmygchen Oct 29, 2024
1701565
Inline `publish_fn`.
jimmygchen Oct 29, 2024
b081dfc
Merge branch 'das-fetch-blobs' of github.com:jimmygchen/lighthouse in…
jimmygchen Oct 29, 2024
1aa125e
Gossip verify blobs before publishing
michaelsproul Nov 1, 2024
7fa2f44
Avoid queries for 0 blobs and error for duplicates
michaelsproul Nov 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions beacon_node/beacon_chain/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ fn all_benches(c: &mut Criterion) {

let kzg = get_kzg(&spec);
for blob_count in [1, 2, 3, 6] {
let kzg = kzg.clone();
let (signed_block, blob_sidecars) = create_test_block_and_blobs::<E>(blob_count, &spec);
let (signed_block, blobs) = create_test_block_and_blobs::<E>(blob_count, &spec);

let column_sidecars =
blobs_to_data_column_sidecars(&blob_sidecars, &signed_block, &kzg.clone(), &spec)
.unwrap();
let column_sidecars = blobs_to_data_column_sidecars(
&blobs.iter().collect::<Vec<_>>(),
&signed_block,
&kzg,
&spec,
)
.unwrap();

let spec = spec.clone();

Expand Down
286 changes: 194 additions & 92 deletions beacon_node/beacon_chain/src/beacon_chain.rs

Large diffs are not rendered by default.

25 changes: 19 additions & 6 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{metrics, BeaconChainError};
use kzg::{Error as KzgError, Kzg, KzgCommitment};
use slog::debug;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use std::time::Duration;
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
Expand Down Expand Up @@ -156,11 +155,6 @@ impl From<BeaconStateError> for GossipBlobError {
}
}

pub type GossipVerifiedBlobList<T> = VariableList<
GossipVerifiedBlob<T>,
<<T as BeaconChainTypes>::EthSpec as EthSpec>::MaxBlobsPerBlock,
>;

/// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on
/// the p2p network.
#[derive(Debug)]
Expand Down Expand Up @@ -335,6 +329,25 @@ impl<E: EthSpec> KzgVerifiedBlobList<E> {
verified_blobs: blobs,
})
}

/// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified.
///
/// This should be used with caution, as used incorrectly it could result in KZG verification
/// being skipped and invalid blobs being deemed valid.
pub fn from_verified<I: IntoIterator<Item = Arc<BlobSidecar<E>>>>(
blobs: I,
seen_timestamp: Duration,
) -> Self {
Self {
verified_blobs: blobs
.into_iter()
.map(|blob| KzgVerifiedBlob {
blob,
seen_timestamp,
})
.collect(),
}
}
}

impl<E: EthSpec> IntoIterator for KzgVerifiedBlobList<E> {
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
consensus_context: ConsensusContext<T::EthSpec>,
}

/// Used to await the result of executing payload with a remote EE.
/// Used to await the result of executing payload with an EE.
type PayloadVerificationHandle = JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError>>>;

/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
Expand Down Expand Up @@ -750,7 +750,8 @@ pub fn build_blob_data_column_sidecars<T: BeaconChainTypes>(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
);
let sidecars = blobs_to_data_column_sidecars(&blobs, block, &chain.kzg, &chain.spec)
let blob_refs = blobs.iter().collect::<Vec<_>>();
let sidecars = blobs_to_data_column_sidecars(&blob_refs, block, &chain.kzg, &chain.spec)
.discard_timer_on_break(&mut timer)?;
drop(timer);
Ok(sidecars)
Expand Down Expand Up @@ -1343,7 +1344,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
/*
* Perform cursory checks to see if the block is even worth processing.
*/

check_block_relevancy(block.as_block(), block_root, chain)?;

// Define a future that will verify the execution payload with an execution engine.
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ pub struct ChainConfig {
pub malicious_withhold_count: usize,
/// Enable peer sampling on blocks.
pub enable_sampling: bool,
/// Number of batches that supernodes split data columns into during publishing by a non-proposer.
pub supernode_data_column_publication_batches: usize,
/// The delay applied by supernodes between the sending of each data column batch.
pub supernode_data_column_publication_batch_interval: Duration,
}

impl Default for ChainConfig {
Expand Down Expand Up @@ -121,6 +125,8 @@ impl Default for ChainConfig {
enable_light_client_server: false,
malicious_withhold_count: 0,
enable_sampling: false,
supernode_data_column_publication_batches: 4,
supernode_data_column_publication_batch_interval: Duration::from_millis(200),
}
}
}
Expand Down
49 changes: 27 additions & 22 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, Slot,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock,
};

mod error;
Expand Down Expand Up @@ -201,7 +201,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_blobs(
&self,
block_root: Hash256,
epoch: Epoch,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
Expand All @@ -212,15 +211,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
// Note: currently not reporting which specific blob is invalid because we fetch all blobs
// from the same peer for both lookup and range sync.

let verified_blobs = KzgVerifiedBlobList::new(
Vec::from(blobs).into_iter().flatten(),
&self.kzg,
seen_timestamp,
)
.map_err(AvailabilityCheckError::InvalidBlobs)?;
let verified_blobs =
KzgVerifiedBlobList::new(blobs.iter().flatten().cloned(), &self.kzg, seen_timestamp)
.map_err(AvailabilityCheckError::InvalidBlobs)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, epoch, verified_blobs, &self.log)
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
}

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
Expand All @@ -229,7 +225,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn put_rpc_custody_columns(
&self,
block_root: Hash256,
epoch: Epoch,
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das): report which column is invalid for proper peer scoring
Expand All @@ -248,12 +243,32 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
verified_custody_columns,
&self.log,
)
}

/// Put a list of blobs received from the EL pool into the availability cache.
///
/// This DOES NOT perform KZG verification because the KZG proofs should have been constructed
/// immediately prior to calling this function so they are assumed to be valid.
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
pub fn put_engine_blobs(
&self,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;

let verified_blobs =
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the blob sidecar.
Expand All @@ -265,7 +280,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
gossip_blob.epoch(),
vec![gossip_blob.into_inner()],
&self.log,
)
Expand All @@ -279,20 +293,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
#[allow(clippy::type_complexity)]
pub fn put_gossip_data_columns(
&self,
slot: Slot,
block_root: Hash256,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());

let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache.put_kzg_verified_data_columns(
block_root,
epoch,
custody_columns,
&self.log,
)
Expand Down Expand Up @@ -595,12 +605,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
);

self.availability_cache
.put_kzg_verified_data_columns(
*block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
data_columns_to_publish.clone(),
&self.log,
)
.put_kzg_verified_data_columns(*block_root, data_columns_to_publish.clone(), &self.log)
.map(|availability| {
DataColumnReconstructionResult::Success((
availability,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use kzg::{Error as KzgError, KzgCommitment};
use types::data_column_sidecar::DataColumnSidecarError;
use types::{BeaconStateError, ColumnIndex, Hash256};

#[derive(Debug)]
Expand All @@ -23,6 +24,7 @@ pub enum Error {
BlockReplayError(state_processing::BlockReplayError),
RebuildingStateCaches(BeaconStateError),
SlotClockError,
DataColumnSidecarError(DataColumnSidecarError),
}

#[derive(PartialEq, Eq)]
Expand All @@ -46,7 +48,8 @@ impl Error {
| Error::BlockReplayError(_)
| Error::UnableToDetermineImportRequirement
| Error::RebuildingStateCaches(_)
| Error::SlotClockError => ErrorCategory::Internal,
| Error::SlotClockError
| Error::DataColumnSidecarError(_) => ErrorCategory::Internal,
Error::InvalidBlobs { .. }
| Error::InvalidColumn { .. }
| Error::ReconstructColumnsError { .. }
Expand Down
Loading
Loading