Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Builder flow for Deneb / Blobs #4345

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 108 additions & 73 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ use crate::validator_monitor::{
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{metrics, BeaconChainError, BeaconForkChoiceStore, BeaconSnapshot, CachedHead};
use eth2::types::{EventKind, SseBlock, SseExtendedPayloadAttributes, SyncDuty};
use eth2::types::{
BlobRootsWrapper, BlobsOrBlobRoots, BlobsWrapper, EventKind, SseBlock,
SseExtendedPayloadAttributes, SyncDuty,
};
use execution_layer::{
BlockProposalContents, BuilderParams, ChainHealth, ExecutionLayer, FailedCondition,
PayloadAttributes, PayloadStatus,
Expand Down Expand Up @@ -115,9 +118,8 @@ use store::{
use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::beacon_block_body::KzgCommitments;
use types::beacon_state::CloneConfig;
use types::blob_sidecar::{BlobSidecarList, Blobs};
use types::blob_sidecar::{BlindedBlobSidecar, BlindedBlobSidecarList, BlobSidecarList};
use types::consts::deneb::MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS;
use types::*;

Expand Down Expand Up @@ -465,7 +467,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
/// The slot at which blocks are downloaded back to.
pub genesis_backfill_slot: Slot,
pub proposal_blob_cache: BlobCache<T::EthSpec>,
pub proposal_blob_cache: BlobCache<BlobSidecarList<T::EthSpec>>,
pub proposal_blinded_blob_cache: BlobCache<BlindedBlobSidecarList<T::EthSpec>>,
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
pub kzg: Option<Arc<Kzg>>,
}
Expand Down Expand Up @@ -4735,7 +4738,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
bls_to_execution_changes,
} = partial_beacon_block;

let (inner_block, blobs_opt, proofs_opt) = match &state {
let (inner_block, blobs_or_blobs_root_opt, proofs_opt) = match &state {
BeaconState::Base(_) => (
BeaconBlock::Base(BeaconBlockBase {
slot,
Expand Down Expand Up @@ -4842,7 +4845,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}
BeaconState::Deneb(_) => {
let (payload, kzg_commitments, blobs, proofs) = block_contents
let (payload, kzg_commitments, blobs_or_blob_roots, proofs) = block_contents
.ok_or(BlockProductionError::MissingExecutionPayload)?
.deconstruct();
(
Expand Down Expand Up @@ -4870,7 +4873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or(BlockProductionError::InvalidPayloadFork)?,
},
}),
blobs,
blobs_or_blob_roots,
proofs,
)
}
Expand Down Expand Up @@ -4924,7 +4927,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

//FIXME(sean)
// - add a new timer for processing here
if let Some(blobs) = blobs_opt {
if let Some(blobs_or_blob_roots) = blobs_or_blobs_root_opt {
let kzg = self
.kzg
.as_ref()
Expand All @@ -4936,58 +4939,89 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
})?;

if expected_kzg_commitments.len() != blobs.len() {
if expected_kzg_commitments.len() != blobs_or_blob_roots.len() {
return Err(BlockProductionError::MissingKzgCommitment(format!(
"Missing KZG commitment for slot {}. Expected {}, got: {}",
slot,
blobs.len(),
blobs_or_blob_roots.len(),
expected_kzg_commitments.len()
)));
}

let kzg_proofs = if let Some(proofs) = proofs_opt {
Vec::from(proofs)
} else {
Self::compute_blob_kzg_proofs(kzg, &blobs, expected_kzg_commitments, slot)?
};
let kzg_proofs = proofs_opt.ok_or(BlockProductionError::MissingKzgProof)?;

kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
&blobs,
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;

let blob_sidecars = BlobSidecarList::from(
blobs
.into_iter()
.enumerate()
.map(|(blob_index, blob)| {
let kzg_commitment = expected_kzg_commitments
.get(blob_index)
.expect("KZG commitment should exist for blob");

let kzg_proof = kzg_proofs
.get(blob_index)
.expect("KZG proof should exist for blob");

Ok(Arc::new(BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
block_parent_root: block.parent_root(),
proposer_index,
blob,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
}))
})
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);
match blobs_or_blob_roots {
BlobsOrBlobRoots::Blobs(BlobsWrapper { blobs }) => {
kzg_utils::validate_blobs::<T::EthSpec>(
kzg,
expected_kzg_commitments,
&blobs,
&kzg_proofs,
)
.map_err(BlockProductionError::KzgError)?;

let blob_sidecars = BlobSidecarList::from(
blobs
.into_iter()
.enumerate()
.map(|(blob_index, blob)| {
let kzg_commitment = expected_kzg_commitments
.get(blob_index)
.expect("KZG commitment should exist for blob");

let kzg_proof = kzg_proofs
.get(blob_index)
.expect("KZG proof should exist for blob");

Ok(Arc::new(BlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
block_parent_root: block.parent_root(),
proposer_index,
blob,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
}))
})
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);

self.proposal_blob_cache
.put(beacon_block_root, blob_sidecars);
self.proposal_blob_cache
.put(beacon_block_root, blob_sidecars);
}
BlobsOrBlobRoots::BlobRoots(BlobRootsWrapper { blob_roots }) => {
let blinded_blob_sidecars = BlindedBlobSidecarList::<T::EthSpec>::from(
blob_roots
.into_iter()
.enumerate()
.map(|(blob_index, blob_root)| {
let kzg_commitment = expected_kzg_commitments
.get(blob_index)
.expect("KZG commitment should exist for blob");

let kzg_proof = kzg_proofs
.get(blob_index)
.expect("KZG proof should exist for blob");

Ok(Arc::new(BlindedBlobSidecar {
block_root: beacon_block_root,
index: blob_index as u64,
slot,
block_parent_root: block.parent_root(),
proposer_index,
blob_root,
kzg_commitment: *kzg_commitment,
kzg_proof: *kzg_proof,
}))
})
.collect::<Result<Vec<_>, BlockProductionError>>()?,
);

self.proposal_blinded_blob_cache
.put(beacon_block_root, blinded_blob_sidecars);
}
}
}

metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
Expand All @@ -5003,28 +5037,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok((block, state))
}

fn compute_blob_kzg_proofs(
kzg: &Arc<Kzg>,
blobs: &Blobs<T::EthSpec>,
expected_kzg_commitments: &KzgCommitments<T::EthSpec>,
slot: Slot,
) -> Result<Vec<KzgProof>, BlockProductionError> {
blobs
.iter()
.enumerate()
.map(|(blob_index, blob)| {
let kzg_commitment = expected_kzg_commitments.get(blob_index).ok_or(
BlockProductionError::MissingKzgCommitment(format!(
"Missing KZG commitment for slot {} blob index {}",
slot, blob_index
)),
)?;

kzg_utils::compute_blob_kzg_proof::<T::EthSpec>(kzg, blob, *kzg_commitment)
.map_err(BlockProductionError::KzgError)
})
.collect::<Result<Vec<KzgProof>, BlockProductionError>>()
}
// FIXME(jimmy) when do we need this?
// fn compute_blob_kzg_proofs(
// kzg: &Arc<Kzg>,
// blobs: &Blobs<T::EthSpec>,
// expected_kzg_commitments: &KzgCommitments<T::EthSpec>,
// slot: Slot,
// ) -> Result<Vec<KzgProof>, BlockProductionError> {
// blobs
// .iter()
// .enumerate()
// .map(|(blob_index, blob)| {
// let kzg_commitment = expected_kzg_commitments.get(blob_index).ok_or(
// BlockProductionError::MissingKzgCommitment(format!(
// "Missing KZG commitment for slot {} blob index {}",
// slot, blob_index
// )),
// )?;
//
// kzg_utils::compute_blob_kzg_proof::<T::EthSpec>(kzg, blob, *kzg_commitment)
// .map_err(BlockProductionError::KzgError)
// })
// .collect::<Result<Vec<KzgProof>, BlockProductionError>>()
// }

/// This method must be called whenever an execution engine indicates that a payload is
/// invalid.
Expand Down
18 changes: 7 additions & 11 deletions beacon_node/beacon_chain/src/blob_cache.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,31 @@
use lru::LruCache;
use parking_lot::Mutex;
use types::{BlobSidecarList, EthSpec, Hash256};
use types::Hash256;

pub const DEFAULT_BLOB_CACHE_SIZE: usize = 10;

/// A cache blobs by beacon block root.
pub struct BlobCache<T: EthSpec> {
blobs: Mutex<LruCache<BlobCacheId, BlobSidecarList<T>>>,
pub struct BlobCache<T> {
blobs: Mutex<LruCache<BlobCacheId, T>>,
}

#[derive(Hash, PartialEq, Eq)]
struct BlobCacheId(Hash256);

impl<T: EthSpec> Default for BlobCache<T> {
impl<T> Default for BlobCache<T> {
fn default() -> Self {
BlobCache {
blobs: Mutex::new(LruCache::new(DEFAULT_BLOB_CACHE_SIZE)),
}
}
}

impl<T: EthSpec> BlobCache<T> {
pub fn put(
&self,
beacon_block: Hash256,
blobs: BlobSidecarList<T>,
) -> Option<BlobSidecarList<T>> {
impl<T> BlobCache<T> {
pub fn put(&self, beacon_block: Hash256, blobs: T) -> Option<T> {
self.blobs.lock().put(BlobCacheId(beacon_block), blobs)
}

pub fn pop(&self, root: &Hash256) -> Option<BlobSidecarList<T>> {
pub fn pop(&self, root: &Hash256) -> Option<T> {
self.blobs.lock().pop(&BlobCacheId(*root))
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,7 @@ where
.map_err(|e| format!("Error initializing DataAvailabiltyChecker: {:?}", e))?,
),
proposal_blob_cache: BlobCache::default(),
proposal_blinded_blob_cache: BlobCache::default(),
kzg,
};

Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,15 @@ pub enum BlockProductionError {
payload_block_hash: ExecutionBlockHash,
},
NoBlobsCached,
NoBlindedBlobsCached,
FailedToReadFinalizedBlock(store::Error),
MissingFinalizedBlock(Hash256),
BlockTooLarge(usize),
ShuttingDown,
MissingSyncAggregate,
MissingExecutionPayload,
MissingKzgCommitment(String),
MissingKzgProof,
TokioJoin(tokio::task::JoinError),
BeaconChain(BeaconChainError),
InvalidPayloadFork,
Expand Down
Loading