From acd31511843db3cce03551624a3f95447b4339ae Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 2 Aug 2024 22:42:11 +1000 Subject: [PATCH] Import gossip data column into data availability checker (#6197) * Import gossip data column into data availability checker --- beacon_node/beacon_chain/src/beacon_chain.rs | 36 ++++++++----- .../src/block_verification_types.rs | 3 +- .../src/data_availability_checker.rs | 33 +++++++++--- .../overflow_lru_cache.rs | 51 ++++++++++++------- .../src/data_column_verification.rs | 19 +++++++ .../beacon_chain/src/early_attester_cache.rs | 13 ++++- .../beacon_chain/src/historical_blocks.rs | 10 +++- beacon_node/beacon_chain/tests/store_tests.rs | 4 +- 8 files changed, 128 insertions(+), 41 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 4bc98a98da0..8cd991cc103 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2959,9 +2959,9 @@ impl BeaconChain { self: &Arc, data_columns: Vec>, ) -> Result> { - let Ok(block_root) = data_columns + let Ok((slot, block_root)) = data_columns .iter() - .map(|c| c.block_root()) + .map(|c| (c.slot(), c.block_root())) .unique() .exactly_one() else { @@ -2981,7 +2981,7 @@ impl BeaconChain { } let r = self - .check_gossip_data_columns_availability_and_import(data_columns) + .check_gossip_data_columns_availability_and_import(slot, block_root, data_columns) .await; self.remove_notified_custody_columns(&block_root, r) } @@ -3298,6 +3298,8 @@ impl BeaconChain { /// if so, otherwise caches the data column in the data availability checker. async fn check_gossip_data_columns_availability_and_import( self: &Arc, + slot: Slot, + block_root: Hash256, data_columns: Vec>, ) -> Result> { if let Some(slasher) = self.slasher.as_ref() { @@ -3306,15 +3308,11 @@ impl BeaconChain { } } - let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else { - return Err(BlockError::InternalError( - "Columns for the same block should have matching slot".to_string(), - )); - }; - - let availability = self - .data_availability_checker - .put_gossip_data_columns(data_columns)?; + let availability = self.data_availability_checker.put_gossip_data_columns( + slot, + block_root, + data_columns, + )?; self.process_availability(slot, availability).await } @@ -3629,7 +3627,7 @@ impl BeaconChain { // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 - let (_, signed_block, blobs) = signed_block.deconstruct(); + let (_, signed_block, blobs, data_columns) = signed_block.deconstruct(); let block = signed_block.message(); ops.extend( confirmed_state_roots @@ -3650,6 +3648,18 @@ impl BeaconChain { } } + if let Some(_data_columns) = data_columns { + // TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073 + // if !data_columns.is_empty() { + // debug!( + // self.log, "Writing data_columns to store"; + // "block_root" => %block_root, + // "count" => data_columns.len(), + // ); + // ops.push(StoreOp::PutDataColumns(block_root, data_columns)); + // } + } + let txn_lock = self.store.hot_db.begin_rw_transaction(); if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 70f1e99ef74..426c41bfeab 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -517,7 +517,8 @@ impl AsBlock for AvailableBlock { } fn into_rpc_block(self) -> RpcBlock { - let (block_root, block, blobs_opt) = self.deconstruct(); + // TODO(das): rpc data columns to be merged from `das` branch + let (block_root, block, blobs_opt, _data_columns_opt) = self.deconstruct(); // Circumvent the constructor here, because an Available block will have already had // consistency checks performed. let inner = match blobs_opt { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index fdba60a69ac..ce5995a5581 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -14,13 +14,16 @@ use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; -use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; +use types::{ + BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, + Slot, +}; mod error; mod overflow_lru_cache; mod state_lru_cache; -use crate::data_column_verification::GossipVerifiedDataColumn; +use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -191,10 +194,18 @@ impl DataAvailabilityChecker { pub fn put_gossip_data_columns( &self, - _gossip_data_columns: Vec>, + slot: Slot, + block_root: Hash256, + gossip_data_columns: Vec>, ) -> Result, AvailabilityCheckError> { - // TODO(das) to be implemented - Err(AvailabilityCheckError::Unexpected) + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); + let custody_columns = gossip_data_columns + .into_iter() + .map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner())) + .collect::>(); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, epoch, custody_columns) } /// Check if we have all the blobs for a block. Returns `Availability` which has information @@ -231,6 +242,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: None, + data_columns: None, blobs_available_timestamp: None, })) } @@ -251,6 +263,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: verified_blobs, + data_columns: None, blobs_available_timestamp: None, })) } @@ -297,6 +310,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: None, + data_columns: None, blobs_available_timestamp: None, })) } @@ -312,6 +326,7 @@ impl DataAvailabilityChecker { block_root, block, blobs: verified_blobs, + data_columns: None, blobs_available_timestamp: None, })) } @@ -477,6 +492,7 @@ pub struct AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, + data_columns: Option>, /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). blobs_available_timestamp: Option, } @@ -486,11 +502,13 @@ impl AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, + data_columns: Option>, ) -> Self { Self { block_root, block, blobs, + data_columns, blobs_available_timestamp: None, } } @@ -510,20 +528,23 @@ impl AvailableBlock { self.blobs_available_timestamp } + #[allow(clippy::type_complexity)] pub fn deconstruct( self, ) -> ( Hash256, Arc>, Option>, + Option>, ) { let AvailableBlock { block_root, block, blobs, + data_columns, blobs_available_timestamp: _, } = self; - (block_root, block, blobs) + (block_root, block, blobs, data_columns) } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index e7bb2034fcb..6c9964bdf86 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -217,7 +217,11 @@ impl PendingComponents { /// /// WARNING: This function can potentially take a lot of time if the state needs to be /// reconstructed from disk. Ensure you are not holding any write locks while calling this. - pub fn make_available(self, recover: R) -> Result, AvailabilityCheckError> + pub fn make_available( + self, + block_import_requirement: BlockImportRequirement, + recover: R, + ) -> Result, AvailabilityCheckError> where R: FnOnce( DietAvailabilityPendingExecutedBlock, @@ -226,7 +230,7 @@ impl PendingComponents { let Self { block_root, verified_blobs, - verified_data_columns: _, + verified_data_columns, executed_block, } = self; @@ -239,17 +243,29 @@ impl PendingComponents { let Some(diet_executed_block) = executed_block else { return Err(AvailabilityCheckError::Unexpected); }; - let num_blobs_expected = diet_executed_block.num_blobs_expected(); - let Some(verified_blobs) = verified_blobs - .into_iter() - .cloned() - .map(|b| b.map(|b| b.to_blob())) - .take(num_blobs_expected) - .collect::>>() - else { - return Err(AvailabilityCheckError::Unexpected); + + let (blobs, data_columns) = match block_import_requirement { + BlockImportRequirement::AllBlobs => { + let num_blobs_expected = diet_executed_block.num_blobs_expected(); + let Some(verified_blobs) = verified_blobs + .into_iter() + .cloned() + .map(|b| b.map(|b| b.to_blob())) + .take(num_blobs_expected) + .collect::>>() + else { + return Err(AvailabilityCheckError::Unexpected); + }; + (Some(VariableList::new(verified_blobs)?), None) + } + BlockImportRequirement::CustodyColumns(_) => { + let verified_data_columns = verified_data_columns + .into_iter() + .map(|d| d.into_inner()) + .collect(); + (None, Some(verified_data_columns)) + } }; - let verified_blobs = VariableList::new(verified_blobs)?; let executed_block = recover(diet_executed_block)?; @@ -262,7 +278,8 @@ impl PendingComponents { let available_block = AvailableBlock { block_root, block, - blobs: Some(verified_blobs), + blobs, + data_columns, blobs_available_timestamp, }; Ok(Availability::Available(Box::new( @@ -404,7 +421,7 @@ impl DataAvailabilityCheckerInner { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(|diet_block| { + pending_components.make_available(block_import_requirement, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -413,7 +430,7 @@ impl DataAvailabilityCheckerInner { } } - // TODO(das): gossip and rpc code paths to be implemented. + // TODO(das): rpc code paths to be implemented. #[allow(dead_code)] pub fn put_kzg_verified_data_columns< I: IntoIterator>, @@ -439,7 +456,7 @@ impl DataAvailabilityCheckerInner { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(|diet_block| { + pending_components.make_available(block_import_requirement, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { @@ -478,7 +495,7 @@ impl DataAvailabilityCheckerInner { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); - pending_components.make_available(|diet_block| { + pending_components.make_available(block_import_requirement, |diet_block| { self.state_cache.recover_pending_executed_block(diet_block) }) } else { diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 53e83a80617..fa31d6f2e8e 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -190,6 +190,10 @@ impl GossipVerifiedDataColumn { pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { self.data_column.data.signed_block_header.clone() } + + pub fn into_inner(self) -> KzgVerifiedDataColumn { + self.data_column + } } /// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification. @@ -204,6 +208,9 @@ impl KzgVerifiedDataColumn { pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { verify_kzg_for_data_column(data_column, kzg) } + pub fn to_data_column(self) -> Arc> { + self.data + } pub fn as_data_column(&self) -> &DataColumnSidecar { &self.data } @@ -226,9 +233,21 @@ pub struct KzgVerifiedCustodyDataColumn { } impl KzgVerifiedCustodyDataColumn { + /// Mark a column as custody column. Caller must ensure that our current custody requirements + /// include this column + pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn) -> Self { + Self { + data: kzg_verified.to_data_column(), + } + } + pub fn index(&self) -> ColumnIndex { self.data.index } + + pub fn into_inner(self) -> Arc> { + self.data + } } /// Complete kzg verification for a `DataColumnSidecar`. diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index dda699cc6c1..606610a7483 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -22,6 +22,7 @@ pub struct CacheItem { */ block: Arc>, blobs: Option>, + data_columns: Option>, proto_block: ProtoBlock, } @@ -69,7 +70,7 @@ impl EarlyAttesterCache { }, }; - let (_, block, blobs) = block.deconstruct(); + let (_, block, blobs, data_columns) = block.deconstruct(); let item = CacheItem { epoch, committee_lengths, @@ -78,6 +79,7 @@ impl EarlyAttesterCache { target, block, blobs, + data_columns, proto_block, }; @@ -164,6 +166,15 @@ impl EarlyAttesterCache { .and_then(|item| item.blobs.clone()) } + /// Returns the data columns, if `block_root` matches the cached item. + pub fn get_data_columns(&self, block_root: Hash256) -> Option> { + self.item + .read() + .as_ref() + .filter(|item| item.beacon_block_root == block_root) + .and_then(|item| item.data_columns.clone()) + } + /// Returns the proto-array block, if `block_root` matches the cached item. pub fn get_proto_block(&self, block_root: Hash256) -> Option { self.item diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 85208c8ad6f..aa2fac2afc8 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -107,7 +107,8 @@ impl BeaconChain { let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); for available_block in blocks_to_import.into_iter().rev() { - let (block_root, block, maybe_blobs) = available_block.deconstruct(); + let (block_root, block, maybe_blobs, maybe_data_columns) = + available_block.deconstruct(); if block_root != expected_block_root { return Err(HistoricalBlockError::MismatchedBlockRoot { @@ -127,6 +128,13 @@ impl BeaconChain { self.store .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch); } + // Store the data columns too + if let Some(_data_columns) = maybe_data_columns { + // TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073 + // new_oldest_data_column_slot = Some(block.slot()); + // self.store + // .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch); + } // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 12f2702822e..01d7798b92c 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2542,10 +2542,10 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. let mut batch_with_invalid_first_block = available_blocks.clone(); batch_with_invalid_first_block[0] = { - let (block_root, block, blobs) = available_blocks[0].clone().deconstruct(); + let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs) + AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs, data_columns) }; // Importing the invalid batch should error.