Skip to content

Commit

Permalink
De-duplicate pubkey_cache between states
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Feb 26, 2024
1 parent e741cae commit fc3e22a
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 14 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
};
use itertools::process_results;
use itertools::{process_results, Itertools};
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -1115,11 +1115,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
epoch_boundary_state_root,
}) = self.load_hot_state_summary(state_root)?
{
let boundary_state =
let mut boundary_state =
get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or(
HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root),
)?;

self.rebase_caches(&mut boundary_state);

// Optimization to avoid even *thinking* about replaying blocks if we're already
// on an epoch boundary.
let state = if slot % E::slots_per_epoch() == 0 {
Expand All @@ -1142,6 +1144,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}

/// Rebase state pubkey_cache on some existing state to prevent re-populating the cache on the
/// first process_deposit call.
fn rebase_caches(&self, state: &mut BeaconState<E>) {
if let Ok((_, first_state)) = self.state_cache.lock().iter().exactly_one() {
state.rebase_caches_on(first_state);
}
}

/// Store a pre-finalization state in the freezer database.
///
/// If the state doesn't lie on a restore point boundary then just its summary will be stored.
Expand Down Expand Up @@ -1274,7 +1284,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

// If low_state is still None, use load_restore_point_by_index to load the state.
let low_state = match low_state {
let mut low_state = match low_state {
Some(state) => state,
None => self.load_restore_point_by_index(low_restore_point_idx)?,
};
Expand All @@ -1301,6 +1311,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&self.spec,
)?;

self.rebase_caches(&mut low_state);

let state = self.replay_blocks(
low_state,
blocks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,9 @@ pub fn process_deposit<T: EthSpec>(
state.validators_mut().push(validator)?;
state.balances_mut().push(deposit.data.amount)?;

// Register newly added key to the pubkey_cache
state.update_pubkey_cache()?;

// Altair or later initializations.
if let Ok(previous_epoch_participation) = state.previous_epoch_participation_mut() {
previous_epoch_participation.push(ParticipationFlags::default())?;
Expand Down
3 changes: 2 additions & 1 deletion consensus/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ serde_json = { workspace = true }
smallvec = { workspace = true }
maplit = { workspace = true }
strum = { workspace = true }
rpds = "1.1.0"

[dev-dependencies]
criterion = { workspace = true }
Expand All @@ -68,4 +69,4 @@ sqlite = []
# The `arbitrary-fuzz` feature is a no-op provided for backwards compatibility.
# For simplicity `Arbitrary` is now derived regardless of the feature's presence.
arbitrary-fuzz = []
portable = ["bls/supranational-portable"]
portable = ["bls/supranational-portable"]
23 changes: 21 additions & 2 deletions consensus/types/src/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,19 @@ impl<T: EthSpec> BeaconState<T> {
/// otherwise returns `None`.
pub fn get_validator_index(&mut self, pubkey: &PublicKeyBytes) -> Result<Option<usize>, Error> {
self.update_pubkey_cache()?;
Ok(self.pubkey_cache().get(pubkey))
Ok(self.get_validator_index_readonly(pubkey))
}

/// pubkey_cache may contain pubkeys from future blocks not yet known to this state. Ignore
/// pubkeys that resolve to indexes beyond the current validators list.
pub fn get_validator_index_readonly(&self, pubkey: &PublicKeyBytes) -> Option<usize> {
self.pubkey_cache().get(pubkey)
self.pubkey_cache().get(pubkey).and_then(|index| {
if index >= self.validators().len() {
Some(index)
} else {
None
}
})
}

/// The epoch corresponding to `self.slot()`.
Expand Down Expand Up @@ -1840,6 +1848,17 @@ impl<T: EthSpec> BeaconState<T> {
Ok(sync_committee)
}

pub fn rebase_caches_on(&mut self, base: &Self) {
// Use pubkey cache from `base` if it contains superior information (likely if our cache is
// uninitialized).
let pubkey_cache = self.pubkey_cache_mut();
let base_pubkey_cache = base.pubkey_cache();
// Note: It's okay to clone a cache with pubkeys from future blocks.
if pubkey_cache.len() < base_pubkey_cache.len() {
*pubkey_cache = base_pubkey_cache.clone();
}
}

pub fn compute_merkle_proof(
&mut self,
generalized_index: usize,
Expand Down
16 changes: 8 additions & 8 deletions consensus/types/src/beacon_state/pubkey_cache.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
use crate::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use rpds::HashTrieMapSync;

type ValidatorIndex = usize;

#[derive(Debug, PartialEq, Clone, Default, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Clone, Default)]
pub struct PubkeyCache {
/// Maintain the number of keys added to the map. It is not sufficient to just use the HashMap
/// len, as it does not increase when duplicate keys are added. Duplicate keys are used during
/// testing.
/// Maintain the number of keys added to the map. It is not sufficient to just use the
/// HashTrieMap len, as it does not increase when duplicate keys are added. Duplicate keys are
/// used during testing.
len: usize,
map: HashMap<PublicKeyBytes, ValidatorIndex>,
map: HashTrieMapSync<PublicKeyBytes, ValidatorIndex>,
}

impl PubkeyCache {
/// Returns the number of validator indices added to the map so far.
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> ValidatorIndex {
self.len
}
Expand All @@ -25,7 +25,7 @@ impl PubkeyCache {
/// that an index is never skipped.
pub fn insert(&mut self, pubkey: PublicKeyBytes, index: ValidatorIndex) -> bool {
if index == self.len {
self.map.insert(pubkey, index);
self.map.insert_mut(pubkey, index);
self.len = self
.len
.checked_add(1)
Expand Down

0 comments on commit fc3e22a

Please sign in to comment.