Skip to content

Commit

Permalink
Attribute invalid column proof error to correct peer (sigp#6377)
Browse files Browse the repository at this point in the history
* Attribute invalid column proof error to correct peer

* Update beacon_node/beacon_chain/src/data_availability_checker.rs

Co-authored-by: Pawan Dhananjay <pawandhananjay@gmail.com>

* fix conflicts
  • Loading branch information
dapplion authored Sep 23, 2024
1 parent 012e7e7 commit d84df57
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 28 deletions.
54 changes: 43 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::{
verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn,
KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
verify_kzg_for_data_column, verify_kzg_for_data_column_list, CustodyDataColumn,
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;
Expand Down Expand Up @@ -195,12 +195,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;

// Note: currently not reporting which specific blob is invalid because we fetch all blobs
// from the same peer for both lookup and range sync.

let verified_blobs = KzgVerifiedBlobList::new(
Vec::from(blobs).into_iter().flatten(),
&self.kzg,
seen_timestamp,
)
.map_err(AvailabilityCheckError::Kzg)?;
.map_err(AvailabilityCheckError::InvalidBlobs)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, epoch, verified_blobs)
Expand All @@ -217,13 +220,15 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
) -> Result<(Availability<T::EthSpec>, DataColumnsToPublish<T::EthSpec>), AvailabilityCheckError>
{
// TODO(das): report which column is invalid for proper peer scoring
// TODO(das): batch KZG verification here
// TODO(das): batch KZG verification here, but fallback into checking each column
// individually to report which column(s) are invalid.
let verified_custody_columns = custody_columns
.into_iter()
.map(|column| {
let index = column.index;
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
KzgVerifiedDataColumn::new(column, &self.kzg)
.map_err(AvailabilityCheckError::Kzg)?,
.map_err(|e| AvailabilityCheckError::InvalidColumn(index, e))?,
))
})
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;
Expand Down Expand Up @@ -308,7 +313,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
if self.blobs_required_for_block(&block) {
return if let Some(blob_list) = blobs.as_ref() {
verify_kzg_for_blob_list(blob_list.iter(), &self.kzg)
.map_err(AvailabilityCheckError::Kzg)?;
.map_err(AvailabilityCheckError::InvalidBlobs)?;
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
Expand All @@ -323,13 +328,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}
if self.data_columns_required_for_block(&block) {
return if let Some(data_column_list) = data_columns.as_ref() {
verify_kzg_for_data_column_list(
verify_kzg_for_data_column_list_with_scoring(
data_column_list
.iter()
.map(|custody_column| custody_column.as_data_column()),
&self.kzg,
)
.map_err(AvailabilityCheckError::Kzg)?;
)?;
Ok(MaybeAvailableBlock::Available(AvailableBlock {
block_root,
block,
Expand Down Expand Up @@ -380,7 +384,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

// verify kzg for all blobs at once
if !all_blobs.is_empty() {
verify_kzg_for_blob_list(all_blobs.iter(), &self.kzg)?;
verify_kzg_for_blob_list(all_blobs.iter(), &self.kzg)
.map_err(AvailabilityCheckError::InvalidBlobs)?;
}

let all_data_columns = blocks
Expand All @@ -396,7 +401,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

// verify kzg for all data columns at once
if !all_data_columns.is_empty() {
verify_kzg_for_data_column_list(all_data_columns.iter(), &self.kzg)?;
// TODO: Need to also attribute which specific block is faulty
verify_kzg_for_data_column_list_with_scoring(all_data_columns.iter(), &self.kzg)?;
}

for block in blocks {
Expand Down Expand Up @@ -598,6 +604,32 @@ async fn availability_cache_maintenance_service<T: BeaconChainTypes>(
}
}

fn verify_kzg_for_data_column_list_with_scoring<'a, E: EthSpec, I>(
data_column_iter: I,
kzg: &'a Kzg,
) -> Result<(), AvailabilityCheckError>
where
I: Iterator<Item = &'a Arc<DataColumnSidecar<E>>> + Clone,
{
let Err(batch_err) = verify_kzg_for_data_column_list(data_column_iter.clone(), kzg) else {
return Ok(());
};

let data_columns = data_column_iter.collect::<Vec<_>>();
// Find which column is invalid. If len is 1 or 0 continue to default case below.
// If len > 1 at least one column MUST fail.
if data_columns.len() > 1 {
for data_column in data_columns {
if let Err(e) = verify_kzg_for_data_column(data_column.clone(), kzg) {
return Err(AvailabilityCheckError::InvalidColumn(data_column.index, e));
}
}
}

// len 0 should never happen
Err(AvailabilityCheckError::InvalidColumn(0, batch_err))
}

/// A fully available block that is ready to be imported into fork choice.
#[derive(Clone, Debug, PartialEq)]
pub struct AvailableBlock<E: EthSpec> {
Expand Down
20 changes: 8 additions & 12 deletions beacon_node/beacon_chain/src/data_availability_checker/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use kzg::{Error as KzgError, KzgCommitment};
use types::{BeaconStateError, Hash256};
use types::{BeaconStateError, ColumnIndex, Hash256};

#[derive(Debug)]
pub enum Error {
Kzg(KzgError),
KzgVerificationFailed,
InvalidBlobs(KzgError),
InvalidColumn(ColumnIndex, KzgError),
ReconstructColumnsError(KzgError),
KzgCommitmentMismatch {
blob_commitment: KzgCommitment,
block_commitment: KzgCommitment,
Expand Down Expand Up @@ -46,11 +47,12 @@ impl Error {
| Error::UnableToDetermineImportRequirement
| Error::RebuildingStateCaches(_)
| Error::SlotClockError => ErrorCategory::Internal,
Error::Kzg(_)
Error::InvalidBlobs { .. }
| Error::InvalidColumn { .. }
| Error::ReconstructColumnsError { .. }
| Error::BlobIndexInvalid(_)
| Error::DataColumnIndexInvalid(_)
| Error::KzgCommitmentMismatch { .. }
| Error::KzgVerificationFailed => ErrorCategory::Malicious,
| Error::KzgCommitmentMismatch { .. } => ErrorCategory::Malicious,
}
}
}
Expand Down Expand Up @@ -78,9 +80,3 @@ impl From<state_processing::BlockReplayError> for Error {
Self::BlockReplayError(value)
}
}

impl From<KzgError> for Error {
fn from(value: KzgError) -> Self {
Self::Kzg(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
kzg,
pending_components.verified_data_columns.as_slice(),
&self.spec,
)?;
)
.map_err(AvailabilityCheckError::ReconstructColumnsError)?;

let data_columns_to_publish = all_data_columns
.iter()
Expand Down
16 changes: 13 additions & 3 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use crate::metrics;
use crate::sync::block_lookups::common::ResponseType;
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
};
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::RequestState;
use fnv::FnvHashMap;
Expand Down Expand Up @@ -591,8 +593,16 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
other => {
debug!(self.log, "Invalid lookup component"; "block_root" => ?block_root, "component" => ?R::response_type(), "error" => ?other);
let peer_group = request_state.on_processing_failure()?;
// TOOD(das): only downscore peer subgroup that provided the invalid proof
for peer in peer_group.all() {
let peers_to_penalize: Vec<_> = match other {
// Note: currenlty only InvalidColumn errors have index granularity,
// but future errors may follow the same pattern. Generalize this
// pattern with https://github.com/sigp/lighthouse/pull/6321
BlockError::AvailabilityCheck(
AvailabilityCheckError::InvalidColumn(index, _),
) => peer_group.of_index(index as usize).collect(),
_ => peer_group.all().collect(),
};
for peer in peers_to_penalize {
cx.report_peer(
*peer,
PeerAction::MidToleranceError,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2449,7 +2449,7 @@ mod deneb_only {
self.rig.single_blob_component_processed(
self.blob_req_id.expect("blob request id").lookup_id,
BlockProcessingResult::Err(BlockError::AvailabilityCheck(
AvailabilityCheckError::KzgVerificationFailed,
AvailabilityCheckError::InvalidBlobs(kzg::Error::KzgVerificationFailed),
)),
);
self.rig.assert_single_lookups_count(1);
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ impl PeerGroup {
pub fn all(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.keys()
}
pub fn of_index(&self, index: usize) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter().filter_map(move |(peer, indices)| {
if indices.contains(&index) {
Some(peer)
} else {
None
}
})
}
}

/// Sequential ID that uniquely identifies ReqResp outgoing requests
Expand Down

0 comments on commit d84df57

Please sign in to comment.