From 723a960cda43f99c93244f0f63edd89de3aca9c9 Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Thu, 9 May 2024 13:17:49 -0500 Subject: [PATCH 1/2] Fix Aggregation Pool for Electra --- beacon_node/beacon_chain/src/beacon_chain.rs | 23 +- .../src/naive_aggregation_pool.rs | 255 +++++++++++++++--- beacon_node/beacon_chain/src/test_utils.rs | 2 +- 3 files changed, 236 insertions(+), 44 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 973dcaadb47..81977aceb73 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1612,11 +1612,30 @@ impl BeaconChain { /// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`. /// /// The attestation will be obtained from `self.naive_aggregation_pool`. - pub fn get_aggregated_attestation( + pub fn get_aggregated_attestation_base( &self, data: &AttestationData, ) -> Result>, Error> { - if let Some(attestation) = self.naive_aggregation_pool.read().get(data) { + let attestation_key = crate::naive_aggregation_pool::AttestationKey::new_base(data.clone()); + if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) { + self.filter_optimistic_attestation(attestation) + .map(Option::Some) + } else { + Ok(None) + } + } + + // TODO(electra): call this function from the new beacon API method + pub fn get_aggregated_attestation_electra( + &self, + data: &AttestationData, + committee_index: CommitteeIndex, + ) -> Result>, Error> { + let attestation_key = crate::naive_aggregation_pool::AttestationKey::new_electra( + data.clone(), + committee_index, + ); + if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) { self.filter_optimistic_attestation(attestation) .map(Option::Some) } else { diff --git a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs index a12521cd171..57415c734fc 100644 --- a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs +++ b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs @@ -1,7 +1,9 @@ use crate::metrics; use crate::observed_aggregates::AsReference; +use itertools::Itertools; +use smallvec::SmallVec; use std::collections::HashMap; -use tree_hash::TreeHash; +use tree_hash::{MerkleHasher, TreeHash, TreeHashType}; use types::consts::altair::SYNC_COMMITTEE_SUBNET_COUNT; use types::slot_data::SlotData; use types::sync_committee_contribution::SyncContributionData; @@ -9,9 +11,107 @@ use types::{ Attestation, AttestationData, AttestationRef, EthSpec, Hash256, Slot, SyncCommitteeContribution, }; -type AttestationDataRoot = Hash256; +type AttestationKeyRoot = Hash256; type SyncDataRoot = Hash256; +/// Post-Electra, we need a new key for Attestations that includes the committee index +#[derive(Debug, Clone, PartialEq)] +pub struct AttestationKey { + data: AttestationData, + committee_index: Option, + slot: Slot, +} + +// A custom implementation of `TreeHash` such that: +// AttestationKey(data, None).tree_hash_root() == data.tree_hash_root() +// AttestationKey(data, Some(index)).tree_hash_root() == (data, index).tree_hash_root() +// This is necessary because pre-Electra, the validator will ask for the tree_hash_root() +// of the `AttestationData` +impl TreeHash for AttestationKey { + fn tree_hash_type() -> TreeHashType { + TreeHashType::Container + } + + fn tree_hash_packed_encoding(&self) -> SmallVec<[u8; 32]> { + unreachable!("AttestationKey should never be packed.") + } + + fn tree_hash_packing_factor() -> usize { + unreachable!("AttestationKey should never be packed.") + } + + fn tree_hash_root(&self) -> Hash256 { + let data_hash = self.data.tree_hash_root(); + match self.committee_index { + None => data_hash, // Return just the data hash if no committee index is present + Some(index) => { + // Combine the hash of the data with the hash of the index + let mut hasher = MerkleHasher::with_leaves(2); + hasher + .write(data_hash.as_bytes()) + .expect("should write data hash"); + hasher + .write(&index.to_le_bytes()) + .expect("should write index"); + hasher.finish().expect("should give tree hash") + } + } + } +} + +impl AttestationKey { + pub fn from_attestation_ref(attestation: AttestationRef) -> Result { + let slot = attestation.data().slot; + match attestation { + AttestationRef::Base(att) => Ok(Self { + data: att.data.clone(), + committee_index: None, + slot, + }), + AttestationRef::Electra(att) => { + let committee_index = att + .committee_bits + .iter() + .enumerate() + .filter_map(|(i, bit)| if bit { Some(i) } else { None }) + .at_most_one() + .map_err(|_| Error::MoreThanOneCommitteeBitSet)? + .ok_or(Error::NoCommitteeBitSet)?; + + Ok(Self { + data: att.data.clone(), + committee_index: Some(committee_index as u64), + slot, + }) + } + } + } + + pub fn new_base(data: AttestationData) -> Self { + let slot = data.slot; + Self { + data, + committee_index: None, + slot, + } + } + + pub fn new_electra(data: AttestationData, committee_index: u64) -> Self { + let slot = data.slot; + Self { + data, + committee_index: Some(committee_index), + slot, + } + } +} + +impl SlotData for AttestationKey { + fn get_slot(&self) -> Slot { + self.slot + } +} + /// The number of slots that will be stored in the pool. /// /// For example, if `SLOTS_RETAINED == 3` and the pool is pruned at slot `6`, then all items @@ -49,6 +149,10 @@ pub enum Error { /// The given `aggregation_bits` field had more than one signature. The number of /// signatures found is included. MoreThanOneAggregationBitSet(usize), + /// The electra attestation has more than one committee bit set + MoreThanOneCommitteeBitSet, + /// The electra attestation has NO committee bit set + NoCommitteeBitSet, /// We have reached the maximum number of unique items that can be stored in a /// slot. This is a DoS protection function. ReachedMaxItemsPerSlot(usize), @@ -112,13 +216,13 @@ where /// A collection of `Attestation` objects, keyed by their `attestation.data`. Enforces that all /// `attestation` are from the same slot. pub struct AggregatedAttestationMap { - map: HashMap>, + map: HashMap>, } impl AggregateMap for AggregatedAttestationMap { - type Key = AttestationDataRoot; + type Key = AttestationKeyRoot; type Value = Attestation; - type Data = AttestationData; + type Data = AttestationKey; /// Create an empty collection with the given `initial_capacity`. fn new(initial_capacity: usize) -> Self { @@ -133,45 +237,43 @@ impl AggregateMap for AggregatedAttestationMap { fn insert(&mut self, a: AttestationRef) -> Result { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_CORE_INSERT); - let set_bits = match a { + let aggregation_bit = match a { AttestationRef::Base(att) => att .aggregation_bits .iter() .enumerate() - .filter(|(_i, bit)| *bit) - .map(|(i, _bit)| i) - .collect::>(), + .filter_map(|(i, bit)| if bit { Some(i) } else { None }) + .at_most_one() + .map_err(|iter| Error::MoreThanOneAggregationBitSet(iter.count()))? + .ok_or(Error::NoAggregationBitsSet)?, AttestationRef::Electra(att) => att .aggregation_bits .iter() .enumerate() - .filter(|(_i, bit)| *bit) - .map(|(i, _bit)| i) - .collect::>(), + .filter_map(|(i, bit)| if bit { Some(i) } else { None }) + .at_most_one() + .map_err(|iter| Error::MoreThanOneAggregationBitSet(iter.count()))? + .ok_or(Error::NoAggregationBitsSet)?, }; - let committee_index = set_bits - .first() - .copied() - .ok_or(Error::NoAggregationBitsSet)?; - - if set_bits.len() > 1 { - return Err(Error::MoreThanOneAggregationBitSet(set_bits.len())); - } - - let attestation_data_root = a.data().tree_hash_root(); + let attestation_key = AttestationKey::from_attestation_ref(a)?; + let attestation_data_root = attestation_key.tree_hash_root(); if let Some(existing_attestation) = self.map.get_mut(&attestation_data_root) { if existing_attestation - .get_aggregation_bit(committee_index) + .get_aggregation_bit(aggregation_bit) .map_err(|_| Error::InconsistentBitfieldLengths)? { - Ok(InsertOutcome::SignatureAlreadyKnown { committee_index }) + Ok(InsertOutcome::SignatureAlreadyKnown { + committee_index: aggregation_bit, + }) } else { let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_AGGREGATION); existing_attestation.aggregate(a); - Ok(InsertOutcome::SignatureAggregated { committee_index }) + Ok(InsertOutcome::SignatureAggregated { + committee_index: aggregation_bit, + }) } } else { if self.map.len() >= MAX_ATTESTATIONS_PER_SLOT { @@ -180,7 +282,9 @@ impl AggregateMap for AggregatedAttestationMap { self.map .insert(attestation_data_root, a.clone_as_attestation()); - Ok(InsertOutcome::NewItemInserted { committee_index }) + Ok(InsertOutcome::NewItemInserted { + committee_index: aggregation_bit, + }) } } @@ -500,19 +604,30 @@ mod tests { use super::*; use ssz_types::BitList; use store::BitVector; + use tree_hash::TreeHash; use types::{ test_utils::{generate_deterministic_keypair, test_random_instance}, - Fork, Hash256, SyncCommitteeMessage, + Attestation, AttestationBase, AttestationElectra, Fork, Hash256, SyncCommitteeMessage, }; type E = types::MainnetEthSpec; - fn get_attestation(slot: Slot) -> Attestation { - let mut a: Attestation = test_random_instance(); - a.data_mut().slot = slot; - *a.aggregation_bits_base_mut().unwrap() = - BitList::with_capacity(4).expect("should create bitlist"); - a + fn get_attestation_base(slot: Slot) -> Attestation { + let mut a: AttestationBase = test_random_instance(); + a.data.slot = slot; + a.aggregation_bits = BitList::with_capacity(4).expect("should create bitlist"); + Attestation::Base(a) + } + + fn get_attestation_electra(slot: Slot) -> Attestation { + let mut a: AttestationElectra = test_random_instance(); + a.data.slot = slot; + a.aggregation_bits = BitList::with_capacity(4).expect("should create bitlist"); + a.committee_bits = BitVector::new(); + a.committee_bits + .set(0, true) + .expect("should set committee bit"); + Attestation::Electra(a) } fn get_sync_contribution(slot: Slot) -> SyncCommitteeContribution { @@ -555,10 +670,16 @@ mod tests { } fn unset_attestation_bit(a: &mut Attestation, i: usize) { - a.aggregation_bits_base_mut() - .unwrap() - .set(i, false) - .expect("should unset aggregation bit") + match a { + Attestation::Base(ref mut att) => att + .aggregation_bits + .set(i, false) + .expect("should unset aggregation bit"), + Attestation::Electra(ref mut att) => att + .aggregation_bits + .set(i, false) + .expect("should unset aggregation bit"), + } } fn unset_sync_contribution_bit(a: &mut SyncCommitteeContribution, i: usize) { @@ -579,8 +700,8 @@ mod tests { a.data().beacon_block_root == block_root } - fn key_from_attestation(a: &Attestation) -> AttestationData { - a.data().clone() + fn key_from_attestation(a: &Attestation) -> AttestationKey { + AttestationKey::from_attestation_ref(a.to_ref()).expect("should create attestation key") } fn mutate_sync_contribution_block_root( @@ -605,6 +726,45 @@ mod tests { SyncContributionData::from_contribution(a) } + #[test] + fn attestation_key_tree_hash_tests() { + let attestation_base = get_attestation_base(Slot::new(42)); + // for a base attestation, the tree_hash_root() of the key should be the same as the tree_hash_root() of the data + let attestation_key_base = AttestationKey::from_attestation_ref(attestation_base.to_ref()) + .expect("should create attestation key"); + assert_eq!( + attestation_key_base.tree_hash_root(), + attestation_base.data().tree_hash_root() + ); + let mut attestation_electra = get_attestation_electra(Slot::new(42)); + // for an electra attestation, the tree_hash_root() of the key should be different from the tree_hash_root() of the data + let attestation_key_electra = + AttestationKey::from_attestation_ref(attestation_electra.to_ref()) + .expect("should create attestation key"); + assert_ne!( + attestation_key_electra.tree_hash_root(), + attestation_electra.data().tree_hash_root() + ); + // for an electra attestation, the tree_hash_root() of the key should be dependent on which committee bit is set + let committe_bits = attestation_electra + .committee_bits_mut() + .expect("should get committee bits"); + committe_bits + .set(0, false) + .expect("should set committee bit"); + committe_bits + .set(1, true) + .expect("should set committee bit"); + let new_attestation_key_electra = + AttestationKey::from_attestation_ref(attestation_electra.to_ref()) + .expect("should create attestation key"); + // this new key should have a different tree_hash_root() than the previous key + assert_ne!( + attestation_key_electra.tree_hash_root(), + new_attestation_key_electra.tree_hash_root() + ); + } + macro_rules! test_suite { ( $mod_name: ident, @@ -800,8 +960,21 @@ mod tests { } test_suite! { - attestation_tests, - get_attestation, + attestation_tests_base, + get_attestation_base, + sign_attestation, + unset_attestation_bit, + mutate_attestation_block_root, + mutate_attestation_slot, + attestation_block_root_comparator, + key_from_attestation, + AggregatedAttestationMap, + MAX_ATTESTATIONS_PER_SLOT + } + + test_suite! { + attestation_tests_electra, + get_attestation_electra, sign_attestation, unset_attestation_bit, mutate_attestation_block_root, diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index edfff4bf81e..bcf7582ebfd 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1374,7 +1374,7 @@ where // aggregate locally. let aggregate = self .chain - .get_aggregated_attestation(attestation.data()) + .get_aggregated_attestation_base(attestation.data()) .unwrap() .unwrap_or_else(|| { committee_attestations.iter().skip(1).fold( From af7c3208a82eb33cf46819b25a8d328a509446cb Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Thu, 9 May 2024 14:10:50 -0500 Subject: [PATCH 2/2] Remove Outdated Interface --- beacon_node/beacon_chain/src/beacon_chain.rs | 25 +++++----- .../src/naive_aggregation_pool.rs | 47 +++++++------------ .../tests/payload_invalidation.rs | 5 +- beacon_node/http_api/src/lib.rs | 2 +- consensus/types/src/attestation.rs | 1 + 5 files changed, 34 insertions(+), 46 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 81977aceb73..331e04069fd 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1616,7 +1616,7 @@ impl BeaconChain { &self, data: &AttestationData, ) -> Result>, Error> { - let attestation_key = crate::naive_aggregation_pool::AttestationKey::new_base(data.clone()); + let attestation_key = crate::naive_aggregation_pool::AttestationKey::new_base(data); if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) { self.filter_optimistic_attestation(attestation) .map(Option::Some) @@ -1631,10 +1631,8 @@ impl BeaconChain { data: &AttestationData, committee_index: CommitteeIndex, ) -> Result>, Error> { - let attestation_key = crate::naive_aggregation_pool::AttestationKey::new_electra( - data.clone(), - committee_index, - ); + let attestation_key = + crate::naive_aggregation_pool::AttestationKey::new_electra(data, committee_index); if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) { self.filter_optimistic_attestation(attestation) .map(Option::Some) @@ -1647,16 +1645,21 @@ impl BeaconChain { /// `attestation.data.tree_hash_root()`. /// /// The attestation will be obtained from `self.naive_aggregation_pool`. - pub fn get_aggregated_attestation_by_slot_and_root( + /// + /// NOTE: This function will *only* work with pre-electra attestations and it only + /// exists to support the pre-electra validator API method. + pub fn get_pre_electra_aggregated_attestation_by_slot_and_root( &self, slot: Slot, attestation_data_root: &Hash256, ) -> Result>, Error> { - if let Some(attestation) = self - .naive_aggregation_pool - .read() - .get_by_slot_and_root(slot, attestation_data_root) - { + let attestation_key = + crate::naive_aggregation_pool::AttestationKey::new_base_from_slot_and_root( + slot, + *attestation_data_root, + ); + + if let Some(attestation) = self.naive_aggregation_pool.read().get(&attestation_key) { self.filter_optimistic_attestation(attestation) .map(Option::Some) } else { diff --git a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs index 57415c734fc..6c4f7cdae72 100644 --- a/beacon_node/beacon_chain/src/naive_aggregation_pool.rs +++ b/beacon_node/beacon_chain/src/naive_aggregation_pool.rs @@ -17,7 +17,7 @@ type SyncDataRoot = Hash256; /// Post-Electra, we need a new key for Attestations that includes the committee index #[derive(Debug, Clone, PartialEq)] pub struct AttestationKey { - data: AttestationData, + data_root: Hash256, committee_index: Option, slot: Slot, } @@ -41,14 +41,13 @@ impl TreeHash for AttestationKey { } fn tree_hash_root(&self) -> Hash256 { - let data_hash = self.data.tree_hash_root(); match self.committee_index { - None => data_hash, // Return just the data hash if no committee index is present + None => self.data_root, // Return just the data root if no committee index is present Some(index) => { // Combine the hash of the data with the hash of the index let mut hasher = MerkleHasher::with_leaves(2); hasher - .write(data_hash.as_bytes()) + .write(self.data_root.as_bytes()) .expect("should write data hash"); hasher .write(&index.to_le_bytes()) @@ -64,7 +63,7 @@ impl AttestationKey { let slot = attestation.data().slot; match attestation { AttestationRef::Base(att) => Ok(Self { - data: att.data.clone(), + data_root: att.data.tree_hash_root(), committee_index: None, slot, }), @@ -79,7 +78,7 @@ impl AttestationKey { .ok_or(Error::NoCommitteeBitSet)?; Ok(Self { - data: att.data.clone(), + data_root: att.data.tree_hash_root(), committee_index: Some(committee_index as u64), slot, }) @@ -87,23 +86,31 @@ impl AttestationKey { } } - pub fn new_base(data: AttestationData) -> Self { + pub fn new_base(data: &AttestationData) -> Self { let slot = data.slot; Self { - data, + data_root: data.tree_hash_root(), committee_index: None, slot, } } - pub fn new_electra(data: AttestationData, committee_index: u64) -> Self { + pub fn new_electra(data: &AttestationData, committee_index: u64) -> Self { let slot = data.slot; Self { - data, + data_root: data.tree_hash_root(), committee_index: Some(committee_index), slot, } } + + pub fn new_base_from_slot_and_root(slot: Slot, data_root: Hash256) -> Self { + Self { + data_root, + committee_index: None, + slot, + } + } } impl SlotData for AttestationKey { @@ -194,9 +201,6 @@ where /// Get a reference to the inner `HashMap`. fn get_map(&self) -> &HashMap; - /// Get a `Value` from `Self` based on `Key`, which is a hash of `Data`. - fn get_by_root(&self, root: &Self::Key) -> Option<&Self::Value>; - /// The number of items store in `Self`. fn len(&self) -> usize; @@ -299,11 +303,6 @@ impl AggregateMap for AggregatedAttestationMap { &self.map } - /// Returns an aggregated `Attestation` with the given `root`, if any. - fn get_by_root(&self, root: &Self::Key) -> Option<&Self::Value> { - self.map.get(root) - } - fn len(&self) -> usize { self.map.len() } @@ -410,11 +409,6 @@ impl AggregateMap for SyncContributionAggregateMap { &self.map } - /// Returns an aggregated `SyncCommitteeContribution` with the given `root`, if any. - fn get_by_root(&self, root: &SyncDataRoot) -> Option<&SyncCommitteeContribution> { - self.map.get(root) - } - fn len(&self) -> usize { self.map.len() } @@ -549,13 +543,6 @@ where .and_then(|map| map.get(data)) } - /// Returns an aggregated `T::Value` with the given `slot` and `root`, if any. - pub fn get_by_slot_and_root(&self, slot: Slot, root: &T::Key) -> Option { - self.maps - .get(&slot) - .and_then(|map| map.get_by_root(root).cloned()) - } - /// Iterate all items in all slots of `self`. pub fn iter(&self) -> impl Iterator { self.maps.values().flat_map(|map| map.get_map().values()) diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 8c9957db169..594872e2fff 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -1228,10 +1228,7 @@ async fn attesting_to_optimistic_head() { let get_aggregated_by_slot_and_root = || { rig.harness .chain - .get_aggregated_attestation_by_slot_and_root( - attestation.data().slot, - &attestation.data().tree_hash_root(), - ) + .get_aggregated_attestation_base(attestation.data()) }; /* diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 6cb8f6fe0b9..838f7233052 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -3191,7 +3191,7 @@ pub fn serve( task_spawner.blocking_json_task(Priority::P0, move || { not_synced_filter?; chain - .get_aggregated_attestation_by_slot_and_root( + .get_pre_electra_aggregated_attestation_by_slot_and_root( query.slot, &query.attestation_data_root, ) diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index bcecfde10e4..6d5cb5bd1a9 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -92,6 +92,7 @@ impl Decode for Attestation { } } +// TODO(electra): think about how to handle fork variants here impl TestRandom for Attestation { fn random_for_test(rng: &mut impl RngCore) -> Self { let aggregation_bits: BitList = BitList::random_for_test(rng);