Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop beacon_chain pubkey to index map cache #17

Draft
wants to merge 5 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 0 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,50 +1462,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.state_at_slot(self.slot()?, StateSkipConfig::WithStateRoots)
}

/// Returns the validator index (if any) for the given public key.
///
/// ## Notes
///
/// This query uses the `validator_pubkey_cache` which contains _all_ validators ever seen,
/// even if those validators aren't included in the head state. It is important to remember
/// that just because a validator exists here, it doesn't necessarily exist in all
/// `BeaconStates`.
///
/// ## Errors
///
/// May return an error if acquiring a read-lock on the `validator_pubkey_cache` times out.
pub fn validator_index(&self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

Ok(pubkey_cache.get_index(pubkey))
}

/// Return the validator indices of all public keys fetched from an iterator.
///
/// If any public key doesn't belong to a known validator then an error will be returned.
/// We could consider relaxing this by returning `Vec<Option<usize>>` in future.
pub fn validator_indices<'a>(
&self,
validator_pubkeys: impl Iterator<Item = &'a PublicKeyBytes>,
) -> Result<Vec<u64>, Error> {
let pubkey_cache = self
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?;

validator_pubkeys
.map(|pubkey| {
pubkey_cache
.get_index(pubkey)
.map(|id| id as u64)
.ok_or(Error::ValidatorPubkeyUnknown(*pubkey))
})
.collect()
}

/// Returns the validator pubkey (if any) for the given validator index.
///
/// ## Notes
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2083,7 +2083,7 @@ fn get_signature_verifier<'a, T: BeaconChainTypes>(

let decompressor = move |pk_bytes| {
// Map compressed pubkey to validator index.
let validator_index = validator_pubkey_cache.get_index(pk_bytes)?;
let validator_index = state.get_validator_index_readonly(pk_bytes)?;
// Map validator index to pubkey (respecting guard on unknown validators).
get_pubkey(validator_index)
};
Expand Down
39 changes: 26 additions & 13 deletions beacon_node/beacon_chain/src/sync_committee_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,22 @@ impl<T: BeaconChainTypes> VerifiedSyncContribution<T> {
.filter_map(|(pubkey, bit)| bit.then_some(pubkey))
.collect::<Vec<_>>();

let participant_indices = {
// TODO(lion): okay to get head state here?
let head_state = &chain.head_snapshot().beacon_state;

participant_pubkeys
.iter()
.map(|pubkey| {
head_state
.get_validator_index_readonly(pubkey)
.ok_or(Error::UnknownValidatorPubkey(*pubkey))
})
.collect::<Result<Vec<_>, _>>()?
};

// Ensure that all signatures are valid.
if !verify_signed_aggregate_signatures(
chain,
&signed_aggregate,
participant_pubkeys.as_slice(),
)? {
if !verify_signed_aggregate_signatures(chain, &signed_aggregate, &participant_indices)? {
return Err(Error::InvalidSignature);
}

Expand Down Expand Up @@ -617,7 +627,7 @@ pub fn verify_propagation_slot_range<S: SlotClock, U: SlotData>(
pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
signed_aggregate: &SignedContributionAndProof<T::EthSpec>,
participant_pubkeys: &[PublicKeyBytes],
participant_indexes: &[usize],
) -> Result<bool, Error> {
let pubkey_cache = chain
.validator_pubkey_cache
Expand Down Expand Up @@ -651,12 +661,8 @@ pub fn verify_signed_aggregate_signatures<T: BeaconChainTypes>(
)
.map_err(BeaconChainError::SignatureSetError)?,
sync_committee_contribution_signature_set_from_pubkeys::<T::EthSpec, _>(
|validator_index| {
pubkey_cache
.get_pubkey_from_pubkey_bytes(validator_index)
.map(Cow::Borrowed)
},
participant_pubkeys,
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
participant_indexes,
&signed_aggregate.message.contribution.signature,
signed_aggregate
.message
Expand All @@ -683,13 +689,20 @@ pub fn verify_sync_committee_message<T: BeaconChainTypes>(
let signature_setup_timer =
metrics::start_timer(&metrics::SYNC_MESSAGE_PROCESSING_SIGNATURE_SETUP_TIMES);

let index = chain
.head_snapshot()
.beacon_state
.pubkey_cache()
.get(pubkey_bytes)
.ok_or(Error::UnknownValidatorPubkey(*pubkey_bytes))?;

let pubkey_cache = chain
.validator_pubkey_cache
.try_read_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(BeaconChainError::ValidatorPubkeyCacheLockTimeout)?;

let pubkey = pubkey_cache
.get_pubkey_from_pubkey_bytes(pubkey_bytes)
.get(index)
.map(Cow::Borrowed)
.ok_or(Error::UnknownValidatorPubkey(*pubkey_bytes))?;

Expand Down
16 changes: 6 additions & 10 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1191,11 +1191,9 @@ where
.iter()
.enumerate()
.map(|(subcommittee_position, pubkey)| {
let validator_index = self
.chain
.validator_index(pubkey)
.expect("should find validator index")
.expect("pubkey should exist in the beacon chain");
let validator_index = state
.get_validator_index_readonly(pubkey)
.expect("should find validator index");

let sync_message = SyncCommitteeMessage::new::<E>(
message_slot,
Expand Down Expand Up @@ -1403,11 +1401,9 @@ where
.unwrap()
.iter()
.find_map(|pubkey| {
let validator_index = self
.chain
.validator_index(pubkey)
.expect("should find validator index")
.expect("pubkey should exist in the beacon chain");
let validator_index = state
.get_validator_index_readonly(pubkey)
.expect("should find validator index");

let selection_proof = SyncSelectionProof::new::<E>(
slot,
Expand Down
37 changes: 2 additions & 35 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::errors::BeaconChainError;
use crate::{BeaconChainTypes, BeaconStore};
use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
Expand All @@ -17,7 +16,6 @@ use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};
/// Decompression is expensive when many keys are involved.
pub struct ValidatorPubkeyCache<T: BeaconChainTypes> {
pubkeys: Vec<PublicKey>,
indices: HashMap<PublicKeyBytes, usize>,
pubkey_bytes: Vec<PublicKeyBytes>,
_phantom: PhantomData<T>,
}
Expand All @@ -32,7 +30,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
) -> Result<Self, BeaconChainError> {
let mut cache = Self {
pubkeys: vec![],
indices: HashMap::new(),
pubkey_bytes: vec![],
_phantom: PhantomData,
};
Expand All @@ -46,7 +43,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
/// Load the pubkey cache from the given on-disk database.
pub fn load_from_store(store: BeaconStore<T>) -> Result<Self, BeaconChainError> {
let mut pubkeys = vec![];
let mut indices = HashMap::new();
let mut pubkey_bytes = vec![];

for validator_index in 0.. {
Expand All @@ -57,15 +53,13 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e))
})?);
pubkey_bytes.push(pubkey);
indices.insert(pubkey, validator_index);
} else {
break;
}
}

Ok(ValidatorPubkeyCache {
pubkeys,
indices,
pubkey_bytes,
_phantom: PhantomData,
})
Expand Down Expand Up @@ -101,16 +95,11 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
{
self.pubkey_bytes.reserve(validator_keys.len());
self.pubkeys.reserve(validator_keys.len());
self.indices.reserve(validator_keys.len());

let mut store_ops = Vec::with_capacity(validator_keys.len());
for pubkey in validator_keys {
let i = self.pubkeys.len();

if self.indices.contains_key(&pubkey) {
return Err(BeaconChainError::DuplicateValidatorPublicKey);
}

// Stage the new validator key for writing to disk.
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
Expand All @@ -125,8 +114,6 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
.map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?,
);
self.pubkey_bytes.push(pubkey);

self.indices.insert(pubkey, i);
}

Ok(store_ops)
Expand All @@ -137,29 +124,19 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
self.pubkeys.get(i)
}

/// Get the `PublicKey` for a validator with `PublicKeyBytes`.
pub fn get_pubkey_from_pubkey_bytes(&self, pubkey: &PublicKeyBytes) -> Option<&PublicKey> {
self.get_index(pubkey).and_then(|index| self.get(index))
}

/// Get the public key (in bytes form) for a validator with index `i`.
pub fn get_pubkey_bytes(&self, i: usize) -> Option<&PublicKeyBytes> {
self.pubkey_bytes.get(i)
}

/// Get the index of a validator with `pubkey`.
pub fn get_index(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.indices.get(pubkey).copied()
}

/// Returns the number of validators in the cache.
pub fn len(&self) -> usize {
self.indices.len()
self.pubkeys.len()
}

/// Returns `true` if there are no validators in the cache.
pub fn is_empty(&self) -> bool {
self.indices.is_empty()
self.pubkeys.is_empty()
}
}

Expand Down Expand Up @@ -226,16 +203,6 @@ mod test {
if i < validator_count {
let pubkey = cache.get(i).expect("pubkey should be present");
assert_eq!(pubkey, &keypairs[i].pk, "pubkey should match cache");

let pubkey_bytes: PublicKeyBytes = pubkey.clone().into();

assert_eq!(
i,
cache
.get_index(&pubkey_bytes)
.expect("should resolve index"),
"index should match cache"
);
} else {
assert_eq!(
cache.get(i),
Expand Down
Loading
Loading