diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 26fb46ef7f8..c13593d7afc 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -24,8 +24,8 @@ mod overflow_lru_cache; mod state_lru_cache; use crate::data_column_verification::{ - verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn, - KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + verify_kzg_for_data_column, verify_kzg_for_data_column_list, CustodyDataColumn, + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, }; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::non_zero_usize::new_non_zero_usize; @@ -195,12 +195,15 @@ impl DataAvailabilityChecker { .now_duration() .ok_or(AvailabilityCheckError::SlotClockError)?; + // Note: currently not reporting which specific blob is invalid because we fetch all blobs + // from the same peer for both lookup and range sync. + let verified_blobs = KzgVerifiedBlobList::new( Vec::from(blobs).into_iter().flatten(), &self.kzg, seen_timestamp, ) - .map_err(AvailabilityCheckError::Kzg)?; + .map_err(AvailabilityCheckError::InvalidBlobs)?; self.availability_cache .put_kzg_verified_blobs(block_root, epoch, verified_blobs) @@ -217,13 +220,15 @@ impl DataAvailabilityChecker { ) -> Result<(Availability, DataColumnsToPublish), AvailabilityCheckError> { // TODO(das): report which column is invalid for proper peer scoring - // TODO(das): batch KZG verification here + // TODO(das): batch KZG verification here, but fallback into checking each column + // individually to report which column(s) are invalid. let verified_custody_columns = custody_columns .into_iter() .map(|column| { + let index = column.index; Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody( KzgVerifiedDataColumn::new(column, &self.kzg) - .map_err(AvailabilityCheckError::Kzg)?, + .map_err(|e| AvailabilityCheckError::InvalidColumn(index, e))?, )) }) .collect::, AvailabilityCheckError>>()?; @@ -308,7 +313,7 @@ impl DataAvailabilityChecker { if self.blobs_required_for_block(&block) { return if let Some(blob_list) = blobs.as_ref() { verify_kzg_for_blob_list(blob_list.iter(), &self.kzg) - .map_err(AvailabilityCheckError::Kzg)?; + .map_err(AvailabilityCheckError::InvalidBlobs)?; Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, @@ -323,13 +328,12 @@ impl DataAvailabilityChecker { } if self.data_columns_required_for_block(&block) { return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list( + verify_kzg_for_data_column_list_with_scoring( data_column_list .iter() .map(|custody_column| custody_column.as_data_column()), &self.kzg, - ) - .map_err(AvailabilityCheckError::Kzg)?; + )?; Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, @@ -380,7 +384,8 @@ impl DataAvailabilityChecker { // verify kzg for all blobs at once if !all_blobs.is_empty() { - verify_kzg_for_blob_list(all_blobs.iter(), &self.kzg)?; + verify_kzg_for_blob_list(all_blobs.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidBlobs)?; } let all_data_columns = blocks @@ -396,7 +401,8 @@ impl DataAvailabilityChecker { // verify kzg for all data columns at once if !all_data_columns.is_empty() { - verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg)?; + // TODO: Need to also attribute which specific block is faulty + verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)?; } for block in blocks { @@ -598,6 +604,32 @@ async fn availability_cache_maintenance_service( } } +fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>( + data_column_iter: I, + kzg: &'a Kzg, +) -> Result<(), AvailabilityCheckError> +where + I: Iterator>> + Clone, +{ + let Err(batch_err) = verify_kzg_for_data_column_list(data_column_iter.clone(), kzg) else { + return Ok(()); + }; + + let data_columns = data_column_iter.collect::>(); + // Find which column is invalid. If len is 1 or 0 continue to default case below. + // If len > 1 at least one column MUST fail. + if data_columns.len() > 1 { + for data_column in data_columns { + if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) { + return Err(AvailabilityCheckError::InvalidColumn(data_column.index, e)); + } + } + } + + // len 0 should never happen + Err(AvailabilityCheckError::InvalidColumn(0, batch_err)) +} + /// A fully available block that is ready to be imported into fork choice. #[derive(Clone, Debug, PartialEq)] pub struct AvailableBlock { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 7f34cacefe3..dbfa00e6e22 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -1,10 +1,11 @@ use kzg::{Error as KzgError, KzgCommitment}; -use types::{BeaconStateError, Hash256}; +use types::{BeaconStateError, ColumnIndex, Hash256}; #[derive(Debug)] pub enum Error { - Kzg(KzgError), - KzgVerificationFailed, + InvalidBlobs(KzgError), + InvalidColumn(ColumnIndex, KzgError), + ReconstructColumnsError(KzgError), KzgCommitmentMismatch { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, @@ -46,11 +47,12 @@ impl Error { | Error::UnableToDetermineImportRequirement | Error::RebuildingStateCaches(_) | Error::SlotClockError => ErrorCategory::Internal, - Error::Kzg(_) + Error::InvalidBlobs { .. } + | Error::InvalidColumn { .. } + | Error::ReconstructColumnsError { .. } | Error::BlobIndexInvalid(_) | Error::DataColumnIndexInvalid(_) - | Error::KzgCommitmentMismatch { .. } - | Error::KzgVerificationFailed => ErrorCategory::Malicious, + | Error::KzgCommitmentMismatch { .. } => ErrorCategory::Malicious, } } } @@ -78,9 +80,3 @@ impl From for Error { Self::BlockReplayError(value) } } - -impl From for Error { - fn from(value: KzgError) -> Self { - Self::Kzg(value) - } -} diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 36c5a9359dd..05f8da4eed9 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -555,7 +555,8 @@ impl DataAvailabilityCheckerInner { kzg, pending_components.verified_data_columns.as_slice(), &self.spec, - )?; + ) + .map_err(AvailabilityCheckError::ReconstructColumnsError)?; let data_columns_to_publish = all_data_columns .iter() diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e31adb783c9..9abcd263de6 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -29,7 +29,9 @@ use crate::metrics; use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use beacon_chain::block_verification_types::AsBlock; -use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; +use beacon_chain::data_availability_checker::{ + AvailabilityCheckError, AvailabilityCheckErrorCategory, +}; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; @@ -591,8 +593,16 @@ impl BlockLookups { other => { debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other); let peer_group = request_state.on_processing_failure()?; - // TOOD(das): only downscore peer subgroup that provided the invalid proof - for peer in peer_group.all() { + let peers_to_penalize: Vec<_> = match other { + // Note: currenlty only InvalidColumn errors have index granularity, + // but future errors may follow the same pattern. Generalize this + // pattern with https://github.com/sigp/lighthouse/pull/6321 + BlockError::AvailabilityCheck( + AvailabilityCheckError::InvalidColumn(index, _), + ) => peer_group.of_index(index as usize).collect(), + _ => peer_group.all().collect(), + }; + for peer in peers_to_penalize { cx.report_peer( *peer, PeerAction::MidToleranceError, diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 5b4f17ac0dd..5aa1d5c2902 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -2449,7 +2449,7 @@ mod deneb_only { self.rig.single_blob_component_processed( self.blob_req_id.expect("blob request id").lookup_id, BlockProcessingResult::Err(BlockError::AvailabilityCheck( - AvailabilityCheckError::KzgVerificationFailed, + AvailabilityCheckError::InvalidBlobs(kzg::Error::KzgVerificationFailed), )), ); self.rig.assert_single_lookups_count(1); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index d6f54178491..07d04b3fb20 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -139,6 +139,15 @@ impl PeerGroup { pub fn all(&self) -> impl Iterator + '_ { self.peers.keys() } + pub fn of_index(&self, index: usize) -> impl Iterator + '_ { + self.peers.iter().filter_map(move |(peer, indices)| { + if indices.contains(&index) { + Some(peer) + } else { + None + } + }) + } } /// Sequential ID that uniquely identifies ReqResp outgoing requests