Skip to content

Commit

Permalink
Drop beacon_chain pubkey to index map cache
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Feb 26, 2024
1 parent f08e8f5 commit e741cae
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 170 deletions.
44 changes: 0 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,50 +1462,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.state_at_slot(self.slot()?, StateSkipConfig::WithStateRoots)
}

/// Returns the validator index (if any) for the given public key.
///
/// ## Notes
///
/// This query uses the `validator_pubkey_cache` which contains _all_ validators ever seen,
/// even if those validators aren't included in the head state. It is important to remember
/// that just because a validator exists here, it doesn't necessarily exist in all
/// `BeaconStates`.
///
/// ## Errors
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

Ok(pubkey_cache.get_index(pubkey))
}

/// Return the validator indices of all public keys fetched from an iterator.
///
/// If any public key doesn't belong to a known validator then an error will be returned.
/// We could consider relaxing this by returning `Vec<Option<usize>>` in future.
pub fn validator_indices<'a>(
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

validator_pubkeys
.map(|pubkey| {
pubkey_cache
.get_index(pubkey)
.map(|id| id as u64)
.ok_or(Error::ValidatorPubkeyUnknown(*pubkey))
})
.collect()
}

/// Returns the validator pubkey (if any) for the given validator index.
///
/// ## Notes
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2083,7 +2083,7 @@ fn get_signature_verifier<'a, T: BeaconChainTypes>(

let decompressor = move |pk_bytes| {
// Map compressed pubkey to validator index.
let validator_index = validator_pubkey_cache.get_index(pk_bytes)?;
let validator_index = state.get_validator_index_readonly(pk_bytes)?;
// Map validator index to pubkey (respecting guard on unknown validators).
get_pubkey(validator_index)
};
Expand Down
39 changes: 26 additions & 13 deletions beacon_node/beacon_chain/src/sync_committee_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,22 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();

let participant_indices = {
// TODO(lion): okay to get head state here?
let head_state = &chain.head_snapshot().beacon_state;

participant_pubkeys
.iter()
.map(|pubkey| {
head_state
.get_validator_index_readonly(pubkey)
.ok_or(Error::UnknownValidatorPubkey(*pubkey))
})
.collect::<Result<Vec<_>, _>>()?
};

// Ensure that all signatures are valid.
if !verify_signed_aggregate_signatures(
chain,
&signed_aggregate,
participant_pubkeys.as_slice(),
)? {
if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &participant_indices)? {
return Err(Error::InvalidSignature);
}

Expand Down Expand Up @@ -617,7 +627,7 @@ pub fn verify_propagation_slot_range<S: SlotClock, U: SlotData>(
pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
signed_aggregate: &SignedContributionAndProof<T::EthSpec>,
participant_pubkeys: &[PublicKeyBytes],
participant_indexes: &[usize],
) -> Result<bool, Error> {
let pubkey_cache = chain
.validator_pubkey_cache
Expand Down Expand Up @@ -651,12 +661,8 @@ pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
)
.map_err(BeaconChainError::SignatureSetError)?,
sync_committee_contribution_signature_set_from_pubkeys::<T::EthSpec, _>(
|validator_index| {
pubkey_cache
.get_pubkey_from_pubkey_bytes(validator_index)
.map(Cow::Borrowed)
},
participant_pubkeys,
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
participant_indexes,
&signed_aggregate.message.contribution.signature,
signed_aggregate
.message
Expand All @@ -683,13 +689,20 @@ pub fn verify_sync_committee_message<T: BeaconChainTypes>(
let signature_setup_timer =
metrics::start_timer(&metrics::SYNC_MESSAGE_PROCESSING_SIGNATURE_SETUP_TIMES);

let index = chain
.head_snapshot()
.beacon_state
.pubkey_cache()
.get(pubkey_bytes)
.ok_or(Error::UnknownValidatorPubkey(*pubkey_bytes))?;

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;

let pubkey = pubkey_cache
.get_pubkey_from_pubkey_bytes(pubkey_bytes)
.get(index)
.map(Cow::Borrowed)
.ok_or(Error::UnknownValidatorPubkey(*pubkey_bytes))?;

Expand Down
16 changes: 6 additions & 10 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1191,11 +1191,9 @@ where
.iter()
.enumerate()
.map(|(subcommittee_position, pubkey)| {
let validator_index = self
.chain
.validator_index(pubkey)
.expect("should find validator index")
.expect("pubkey should exist in the beacon chain");
let validator_index = state
.get_validator_index_readonly(pubkey)
.expect("should find validator index");

let sync_message = SyncCommitteeMessage::new::<E>(
message_slot,
Expand Down Expand Up @@ -1403,11 +1401,9 @@ where
.unwrap()
.iter()
.find_map(|pubkey| {
let validator_index = self
.chain
.validator_index(pubkey)
.expect("should find validator index")
.expect("pubkey should exist in the beacon chain");
let validator_index = state
.get_validator_index_readonly(pubkey)
.expect("should find validator index");

let selection_proof = SyncSelectionProof::new::<E>(
slot,
Expand Down
37 changes: 2 additions & 35 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore};
use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
Expand All @@ -17,7 +16,6 @@ use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// Decompression is expensive when many keys are involved.
pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
_phantom: PhantomData<T>,
}
Expand All @@ -32,7 +30,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
) -> Result<Self, BeaconChainError> {
let mut cache = Self {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
_phantom: PhantomData,
};
Expand All @@ -46,7 +43,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
/// Load the pubkey cache from the given on-disk database.
pub fn load_from_store(store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
let mut pubkeys = vec![];
let mut indices = HashMap::new();
let mut pubkey_bytes = vec![];

for validator_index in 0.. {
Expand All @@ -57,15 +53,13 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index);
} else {
break;
}
}

Ok(ValidatorPubkeyCache {
pubkeys,
indices,
pubkey_bytes,
_phantom: PhantomData,
})
Expand Down Expand Up @@ -101,16 +95,11 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
{
self.pubkey_bytes.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());

let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey in validator_keys {
let i = self.pubkeys.len();

if self.indices.contains_key(&pubkey) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}

// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
Expand All @@ -125,8 +114,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
);
self.pubkey_bytes.push(pubkey);

self.indices.insert(pubkey, i);
}

Ok(store_ops)
Expand All @@ -137,29 +124,19 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
self.pubkeys.get(i)
}

/// Get the `PublicKey` for a validator with `PublicKeyBytes`.
pub fn get_pubkey_from_pubkey_bytes(&self, pubkey: &PublicKeyBytes) -> Option<&PublicKey> {
self.get_index(pubkey).and_then(|index| self.get(index))
}

/// Get the public key (in bytes form) for a validator with index `i`.
pub fn get_pubkey_bytes(&self, i: usize) -> Option<&PublicKeyBytes> {
self.pubkey_bytes.get(i)
}

/// Get the index of a validator with `pubkey`.
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.indices.get(pubkey).copied()
}

/// Returns the number of validators in the cache.
pub fn len(&self) -> usize {
self.indices.len()
self.pubkeys.len()
}

/// Returns `true` if there are no validators in the cache.
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
self.pubkeys.is_empty()
}
}

Expand Down Expand Up @@ -226,16 +203,6 @@ mod test {
if i < validator_count {
let pubkey = cache.get(i).expect("pubkey should be present");
assert_eq!(pubkey, &keypairs[i].pk, "pubkey should match cache");

let pubkey_bytes: PublicKeyBytes = pubkey.clone().into();

assert_eq!(
i,
cache
.get_index(&pubkey_bytes)
.expect("should resolve index"),
"index should match cache"
);
} else {
assert_eq!(
cache.get(i),
Expand Down
Loading

0 comments on commit e741cae

Please sign in to comment.