diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index b5e444e809d..30c8869585e 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -21,9 +21,11 @@ use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; +use store::hdiff::HierarchyConfig; use store::{ + config::StoreConfigError, iter::{BlockRootsIterator, StateRootsIterator}, - HotColdDB, LevelDB, StoreConfig, + Error as StoreError, HotColdDB, LevelDB, StoreConfig, }; use tempfile::{tempdir, TempDir}; use types::test_utils::{SeedableRng, XorShiftRng}; @@ -49,13 +51,25 @@ fn get_store_with_spec( db_path: &TempDir, spec: ChainSpec, ) -> Arc, LevelDB>> { + let config = StoreConfig { + // More frequent snapshots and hdiffs in tests for testing + hierarchy_config: HierarchyConfig { + exponents: vec![1, 3, 5], + }, + ..Default::default() + }; + try_get_store_with_spec_and_config(db_path, spec, config).expect("disk store should initialize") +} + +fn try_get_store_with_spec_and_config( + db_path: &TempDir, + spec: ChainSpec, + config: StoreConfig, +) -> Result, LevelDB>>, StoreError> { let hot_path = db_path.path().join("hot_db"); let cold_path = db_path.path().join("cold_db"); - let config = StoreConfig::default(); let log = test_logger(); - HotColdDB::open(&hot_path, &cold_path, |_, _, _| Ok(()), config, spec, log) - .expect("disk store should initialize") } fn get_harness( @@ -2481,6 +2495,43 @@ async fn revert_minority_fork_on_resume() { assert_eq!(heads.len(), 1); } +#[tokio::test] +#[ignore] +// FIXME(jimmy): Ignoring this now as the test is flaky :/ It intermittently fails with an IO error +// "..cold_db/LOCK file held by another process". +// There seems to be some race condition between dropping the lock file and and re-opening the db. +// There's a higher chance this test would fail when the entire test suite is run. Maybe it isn't +// fast enough at dropping the cold_db LOCK file before the test attempts to open it again. +async fn should_not_initialize_incompatible_store_config() { + let validator_count = 16; + let spec = MinimalEthSpec::default_spec(); + let db_path = tempdir().unwrap(); + let store_config = StoreConfig::default(); + let store = try_get_store_with_spec_and_config(&db_path, spec.clone(), store_config.clone()) + .expect("disk store should initialize"); + let harness = BeaconChainHarness::builder(MinimalEthSpec) + .spec(spec.clone()) + .deterministic_keypairs(validator_count) + .fresh_disk_store(store) + .build(); + + // Resume from disk with a different store config. + drop(harness); + let different_store_config = StoreConfig { + linear_blocks: !store_config.linear_blocks, + ..store_config + }; + let maybe_err = + try_get_store_with_spec_and_config(&db_path, spec, different_store_config).err(); + + assert!(matches!( + maybe_err, + Some(StoreError::ConfigError( + StoreConfigError::IncompatibleStoreConfig { .. } + )) + )); +} + // This test checks whether the schema downgrade from the latest version to some minimum supported // version is correct. This is the easiest schema test to write without historic versions of // Lighthouse on-hand, but has the disadvantage that the min version needs to be adjusted manually diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 738680286b4..5d0073f9684 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -533,6 +533,21 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { [default: 8192 (mainnet) or 64 (minimal)]") .takes_value(true) ) + .arg( + Arg::with_name("hierarchy-exponents") + .long("hierarchy-exponents") + .value_name("EXPONENTS") + .help("Specifies the frequency for storing full state snapshots and hierarchical \ + diffs in the freezer DB. Accepts a comma-separated list of ascending \ + exponents. Each exponent defines an interval for storing diffs to the layer \ + above. The last exponent defines the interval for full snapshots. \ + For example, a config of '4,8,12' would store a full snapshot every \ + 4096 (2^12) slots, first-level diffs every 256 (2^8) slots, and second-level \ + diffs every 16 (2^4) slots. \ + Cannot be changed after initialization. \ + [default: 5,9,11,13,16,18,21]") + .takes_value(true) + ) .arg( Arg::with_name("epochs-per-migration") .long("epochs-per-migration") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 48914b16a72..ad7a7c1aaaf 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -23,6 +23,7 @@ use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; +use store::hdiff::HierarchyConfig; use types::{Checkpoint, Epoch, EthSpec, Hash256, PublicKeyBytes, GRAFFITI_BYTES_LEN}; /// Gets the fully-initialized global client. @@ -422,6 +423,24 @@ pub fn get_config( client_config.store.epochs_per_state_diff = epochs_per_state_diff; } + if let Some(hierarchy_exponents) = + clap_utils::parse_optional::(cli_args, "hierarchy-exponents")? + { + let exponents = hierarchy_exponents + .split(',') + .map(|s| { + s.parse() + .map_err(|e| format!("invalid hierarchy-exponents: {e:?}")) + }) + .collect::, _>>()?; + + if exponents.windows(2).any(|w| w[0] >= w[1]) { + return Err("hierarchy-exponents must be in ascending order".to_string()); + } + + client_config.store.hierarchy_config = HierarchyConfig { exponents }; + } + if let Some(epochs_per_migration) = clap_utils::parse_optional(cli_args, "epochs-per-migration")? { diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index f701a03aae0..18878e95e6c 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -45,16 +45,25 @@ pub struct StoreConfig { /// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params. #[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] -// FIXME(sproul): schema migration, add hdiff +// FIXME(sproul): schema migration pub struct OnDiskStoreConfig { pub linear_blocks: bool, - pub linear_restore_points: bool, + pub hierarchy_config: HierarchyConfig, } #[derive(Debug, Clone)] pub enum StoreConfigError { - MismatchedSlotsPerRestorePoint { config: u64, on_disk: u64 }, - InvalidCompressionLevel { level: i32 }, + MismatchedSlotsPerRestorePoint { + config: u64, + on_disk: u64, + }, + InvalidCompressionLevel { + level: i32, + }, + IncompatibleStoreConfig { + config: OnDiskStoreConfig, + on_disk: OnDiskStoreConfig, + }, } impl Default for StoreConfig { @@ -80,15 +89,21 @@ impl StoreConfig { pub fn as_disk_config(&self) -> OnDiskStoreConfig { OnDiskStoreConfig { linear_blocks: self.linear_blocks, - linear_restore_points: self.linear_restore_points, + hierarchy_config: self.hierarchy_config.clone(), } } pub fn check_compatibility( &self, - _on_disk_config: &OnDiskStoreConfig, + on_disk_config: &OnDiskStoreConfig, ) -> Result<(), StoreConfigError> { - // FIXME(sproul): TODO + let db_config = self.as_disk_config(); + if db_config.ne(on_disk_config) { + return Err(StoreConfigError::IncompatibleStoreConfig { + config: db_config, + on_disk: on_disk_config.clone(), + }); + } Ok(()) } @@ -146,3 +161,49 @@ impl StoreItem for OnDiskStoreConfig { Ok(Self::from_ssz_bytes(bytes)?) } } + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn check_compatibility_ok() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: true, + hierarchy_config: store_config.hierarchy_config.clone(), + }; + assert!(store_config.check_compatibility(&on_disk_config).is_ok()); + } + + #[test] + fn check_compatibility_linear_blocks_mismatch() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: false, + hierarchy_config: store_config.hierarchy_config.clone(), + }; + assert!(store_config.check_compatibility(&on_disk_config).is_err()); + } + + #[test] + fn check_compatibility_hierarchy_config_incompatible() { + let store_config = StoreConfig { + linear_blocks: true, + ..Default::default() + }; + let on_disk_config = OnDiskStoreConfig { + linear_blocks: true, + hierarchy_config: HierarchyConfig { + exponents: vec![5, 8, 11, 13, 16, 18, 21], + }, + }; + assert!(store_config.check_compatibility(&on_disk_config).is_err()); + } +} diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 9d7834e4fc2..2a170b6f0f2 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -41,7 +41,7 @@ pub enum Error { }, MissingStateRoot(Slot), MissingState(Hash256), - MissingSnapshot(Epoch), + MissingSnapshot(Slot), MissingDiff(Epoch), NoBaseStateFound(Hash256), BlockReplayError(BlockReplayError), diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index a1421ee3bb9..cdb88d37680 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -5,21 +5,21 @@ use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use std::io::{Read, Write}; -use types::{BeaconState, ChainSpec, Epoch, EthSpec, VList}; +use types::{BeaconState, ChainSpec, EthSpec, Slot, VList}; use zstd::{Decoder, Encoder}; #[derive(Debug)] pub enum Error { InvalidHierarchy, - XorDeletionsNotSupported, + U64DiffDeletionsNotSupported, UnableToComputeDiff, UnableToApplyDiff, Compression(std::io::Error), } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] pub struct HierarchyConfig { - exponents: Vec, + pub exponents: Vec, } #[derive(Debug)] @@ -29,8 +29,8 @@ pub struct HierarchyModuli { #[derive(Debug, PartialEq, Eq)] pub enum StorageStrategy { - Nothing, - DiffFrom(Epoch), + ReplayFrom(Slot), + DiffFrom(Slot), Snapshot, } @@ -45,7 +45,7 @@ pub struct HDiffBuffer { #[derive(Debug, Encode, Decode)] pub struct HDiff { state_diff: BytesDiff, - balances_diff: XorDiff, + balances_diff: CompressedU64Diff, } #[derive(Debug, Encode, Decode)] @@ -54,7 +54,7 @@ pub struct BytesDiff { } #[derive(Debug, Encode, Decode)] -pub struct XorDiff { +pub struct CompressedU64Diff { bytes: Vec, } @@ -78,7 +78,7 @@ impl HDiffBuffer { impl HDiff { pub fn compute(source: &HDiffBuffer, target: &HDiffBuffer) -> Result { let state_diff = BytesDiff::compute(&source.state, &target.state)?; - let balances_diff = XorDiff::compute(&source.balances, &target.balances)?; + let balances_diff = CompressedU64Diff::compute(&source.balances, &target.balances)?; Ok(Self { state_diff, @@ -138,10 +138,10 @@ impl BytesDiff { } } -impl XorDiff { +impl CompressedU64Diff { pub fn compute(xs: &[u64], ys: &[u64]) -> Result { if xs.len() > ys.len() { - return Err(Error::XorDeletionsNotSupported); + return Err(Error::U64DiffDeletionsNotSupported); } let uncompressed_bytes: Vec = ys @@ -164,7 +164,7 @@ impl XorDiff { .map_err(Error::Compression)?; encoder.finish().map_err(Error::Compression)?; - Ok(XorDiff { + Ok(CompressedU64Diff { bytes: compressed_bytes, }) } @@ -198,7 +198,7 @@ impl XorDiff { impl Default for HierarchyConfig { fn default() -> Self { HierarchyConfig { - exponents: vec![0, 4, 6, 8, 11, 13, 16], + exponents: vec![5, 9, 11, 13, 16, 18, 21], } } } @@ -226,30 +226,45 @@ impl HierarchyConfig { } impl HierarchyModuli { - pub fn storage_strategy(&self, epoch: Epoch) -> Result { + pub fn storage_strategy(&self, slot: Slot) -> Result { let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; - - if epoch % last == 0 { + let first = self + .moduli + .first() + .copied() + .ok_or(Error::InvalidHierarchy)?; + let replay_from = slot / first * first; + + if slot % last == 0 { return Ok(StorageStrategy::Snapshot); } - let diff_from = self.moduli.iter().rev().find_map(|&n| { - (epoch % n == 0).then(|| { - // Diff from the previous state. - (epoch - 1) / n * n - }) - }); - Ok(diff_from.map_or(StorageStrategy::Nothing, StorageStrategy::DiffFrom)) + let diff_from = self + .moduli + .iter() + .rev() + .tuple_windows() + .find_map(|(&n_big, &n_small)| { + (slot % n_small == 0).then(|| { + // Diff from the previous layer. + slot / n_big * n_big + }) + }); + + Ok(diff_from.map_or( + StorageStrategy::ReplayFrom(replay_from), + StorageStrategy::DiffFrom, + )) } - /// Return the smallest epoch greater than or equal to `epoch` at which a full snapshot should + /// Return the smallest slot greater than or equal to `slot` at which a full snapshot should /// be stored. - pub fn next_snapshot_epoch(&self, epoch: Epoch) -> Result { + pub fn next_snapshot_slot(&self, slot: Slot) -> Result { let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; - if epoch % last == 0 { - Ok(epoch) + if slot % last == 0 { + Ok(slot) } else { - Ok((epoch / last + 1) * last) + Ok((slot / last + 1) * last) } } } @@ -265,10 +280,10 @@ mod tests { let moduli = config.to_moduli().unwrap(); - // Full snapshots at multiples of 2^16. - let snapshot_freq = Epoch::new(1 << 16); + // Full snapshots at multiples of 2^21. + let snapshot_freq = Slot::new(1 << 21); assert_eq!( - moduli.storage_strategy(Epoch::new(0)).unwrap(), + moduli.storage_strategy(Slot::new(0)).unwrap(), StorageStrategy::Snapshot ); assert_eq!( @@ -280,46 +295,52 @@ mod tests { StorageStrategy::Snapshot ); - // For the first layer of diffs - let first_layer = Epoch::new(1 << 13); + // Diffs should be from the previous layer (the snapshot in this case), and not the previous diff in the same layer. + let first_layer = Slot::new(1 << 18); assert_eq!( moduli.storage_strategy(first_layer * 2).unwrap(), - StorageStrategy::DiffFrom(first_layer) + StorageStrategy::DiffFrom(Slot::new(0)) + ); + + let replay_strategy_slot = first_layer + 1; + assert_eq!( + moduli.storage_strategy(replay_strategy_slot).unwrap(), + StorageStrategy::ReplayFrom(first_layer) ); } #[test] - fn next_snapshot_epoch() { + fn next_snapshot_slot() { let config = HierarchyConfig::default(); config.validate().unwrap(); let moduli = config.to_moduli().unwrap(); - let snapshot_freq = Epoch::new(1 << 16); + let snapshot_freq = Slot::new(1 << 21); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq).unwrap(), + moduli.next_snapshot_slot(snapshot_freq).unwrap(), snapshot_freq ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq + 1).unwrap(), + moduli.next_snapshot_slot(snapshot_freq + 1).unwrap(), snapshot_freq * 2 ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq * 2 - 1).unwrap(), + moduli.next_snapshot_slot(snapshot_freq * 2 - 1).unwrap(), snapshot_freq * 2 ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq * 2).unwrap(), + moduli.next_snapshot_slot(snapshot_freq * 2).unwrap(), snapshot_freq * 2 ); assert_eq!( - moduli.next_snapshot_epoch(snapshot_freq * 100).unwrap(), + moduli.next_snapshot_slot(snapshot_freq * 100).unwrap(), snapshot_freq * 100 ); } #[test] - fn xor_vs_bytes_diff() { + fn compressed_u64_vs_bytes_diff() { let x_values = vec![99u64, 55, 123, 6834857, 0, 12]; let y_values = vec![98u64, 55, 312, 1, 1, 2, 4, 5]; @@ -329,12 +350,12 @@ mod tests { let x_bytes = to_bytes(&x_values); let y_bytes = to_bytes(&y_values); - let xor_diff = XorDiff::compute(&x_values, &y_values).unwrap(); + let u64_diff = CompressedU64Diff::compute(&x_values, &y_values).unwrap(); - let mut y_from_xor = x_values; - xor_diff.apply(&mut y_from_xor).unwrap(); + let mut y_from_u64_diff = x_values; + u64_diff.apply(&mut y_from_u64_diff).unwrap(); - assert_eq!(y_values, y_from_xor); + assert_eq!(y_values, y_from_u64_diff); let bytes_diff = BytesDiff::compute(&x_bytes, &y_bytes).unwrap(); @@ -343,7 +364,7 @@ mod tests { assert_eq!(y_bytes, y_from_bytes); - // XOR diff wins by more than a factor of 3 - assert!(xor_diff.bytes.len() < 3 * bytes_diff.bytes.len()); + // U64 diff wins by more than a factor of 3 + assert!(u64_diff.bytes.len() < 3 * bytes_diff.bytes.len()); } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7321722a746..68703a4d262 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -79,7 +79,7 @@ pub struct HotColdDB, Cold: ItemStore> { #[allow(dead_code)] historic_state_cache: Mutex>>, /// Cache of hierarchical diff buffers. - diff_buffer_cache: Mutex>, + diff_buffer_cache: Mutex>, // Cache of hierarchical diffs. // FIXME(sproul): see if this is necessary /// Chain spec. @@ -113,7 +113,7 @@ pub enum HotColdDBError { MissingPrevState(Hash256), MissingSplitState(Hash256, Slot), MissingStateDiff(Hash256), - MissingHDiff(Epoch), + MissingHDiff(Slot), MissingExecutionPayload(Hash256), MissingFullBlockExecutionPayloadPruned(Hash256, Slot), MissingAnchorInfo, @@ -1403,17 +1403,14 @@ impl, Cold: ItemStore> HotColdDB ) -> Result<(), Error> { self.store_cold_state_summary(state_root, state.slot(), ops)?; - if state.slot() % E::slots_per_epoch() != 0 { - return Ok(()); - } - - let epoch = state.current_epoch(); - match self.hierarchy.storage_strategy(epoch)? { - StorageStrategy::Nothing => { + let slot = state.slot(); + match self.hierarchy.storage_strategy(slot)? { + StorageStrategy::ReplayFrom(from) => { debug!( self.log, "Storing cold state"; "strategy" => "replay", + "from_slot" => from, "slot" => state.slot(), ); } @@ -1431,6 +1428,7 @@ impl, Cold: ItemStore> HotColdDB self.log, "Storing cold state"; "strategy" => "diff", + "from_slot" => from, "slot" => state.slot(), ); self.store_cold_state_as_diff(state, from, ops)?; @@ -1453,22 +1451,18 @@ impl, Cold: ItemStore> HotColdDB encoder.write_all(&bytes).map_err(Error::Compression)?; encoder.finish().map_err(Error::Compression)?; - let epoch = state.current_epoch(); let key = get_key_for_col( DBColumn::BeaconStateSnapshot.into(), - &epoch.as_u64().to_be_bytes(), + &state.slot().as_u64().to_be_bytes(), ); ops.push(KeyValueStoreOp::PutKeyValue(key, compressed_value)); Ok(()) } - pub fn load_cold_state_bytes_as_snapshot( - &self, - epoch: Epoch, - ) -> Result>, Error> { + pub fn load_cold_state_bytes_as_snapshot(&self, slot: Slot) -> Result>, Error> { match self.cold_db.get_bytes( DBColumn::BeaconStateSnapshot.into(), - &epoch.as_u64().to_be_bytes(), + &slot.as_u64().to_be_bytes(), )? { Some(bytes) => { let mut ssz_bytes = @@ -1483,12 +1477,9 @@ impl, Cold: ItemStore> HotColdDB } } - pub fn load_cold_state_as_snapshot( - &self, - epoch: Epoch, - ) -> Result>, Error> { + pub fn load_cold_state_as_snapshot(&self, slot: Slot) -> Result>, Error> { Ok(self - .load_cold_state_bytes_as_snapshot(epoch)? + .load_cold_state_bytes_as_snapshot(slot)? .map(|bytes| BeaconState::from_ssz_bytes(&bytes, &self.spec)) .transpose()?) } @@ -1496,18 +1487,18 @@ impl, Cold: ItemStore> HotColdDB pub fn store_cold_state_as_diff( &self, state: &BeaconState, - from_epoch: Epoch, + from_slot: Slot, ops: &mut Vec, ) -> Result<(), Error> { // Load diff base state bytes. - let base_buffer = self.load_hdiff_buffer_for_epoch(from_epoch)?; + let (_, base_buffer) = self.load_hdiff_buffer_for_slot(from_slot)?; let target_buffer = HDiffBuffer::from_state(state.clone()); let diff = HDiff::compute(&base_buffer, &target_buffer)?; let diff_bytes = diff.as_ssz_bytes(); let key = get_key_for_col( DBColumn::BeaconStateDiff.into(), - &state.current_epoch().as_u64().to_be_bytes(), + &state.slot().as_u64().to_be_bytes(), ); ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes)); Ok(()) @@ -1527,10 +1518,9 @@ impl, Cold: ItemStore> HotColdDB /// /// Will reconstruct the state if it lies between restore points. pub fn load_cold_state_by_slot(&self, slot: Slot) -> Result>, Error> { - let epoch = slot.epoch(E::slots_per_epoch()); - - let hdiff_buffer = self.load_hdiff_buffer_for_epoch(epoch)?; + let (base_slot, hdiff_buffer) = self.load_hdiff_buffer_for_slot(slot)?; let base_state = hdiff_buffer.into_state(&self.spec)?; + debug_assert_eq!(base_slot, base_state.slot()); if base_state.slot() == slot { return Ok(Some(base_state)); @@ -1548,56 +1538,68 @@ impl, Cold: ItemStore> HotColdDB .map(Some) } - fn load_hdiff_for_epoch(&self, epoch: Epoch) -> Result { + fn load_hdiff_for_slot(&self, slot: Slot) -> Result { self.cold_db .get_bytes( DBColumn::BeaconStateDiff.into(), - &epoch.as_u64().to_be_bytes(), + &slot.as_u64().to_be_bytes(), )? .map(|bytes| HDiff::from_ssz_bytes(&bytes)) - .ok_or(HotColdDBError::MissingHDiff(epoch))? + .ok_or(HotColdDBError::MissingHDiff(slot))? .map_err(Into::into) } - fn load_hdiff_buffer_for_epoch(&self, epoch: Epoch) -> Result { - if let Some(buffer) = self.diff_buffer_cache.lock().get(&epoch) { + /// Returns `HDiffBuffer` for the specified slot, or `HDiffBuffer` for the `ReplayFrom` slot if + /// the diff for the specified slot is not stored. + fn load_hdiff_buffer_for_slot(&self, slot: Slot) -> Result<(Slot, HDiffBuffer), Error> { + if let Some(buffer) = self.diff_buffer_cache.lock().get(&slot) { debug!( self.log, "Hit diff buffer cache"; - "epoch" => epoch + "slot" => slot ); - return Ok(buffer.clone()); + return Ok((slot, buffer.clone())); } // Load buffer for the previous state. // This amount of recursion (<10 levels) should be OK. let t = std::time::Instant::now(); - let mut buffer = match self.hierarchy.storage_strategy(epoch)? { + let (_buffer_slot, mut buffer) = match self.hierarchy.storage_strategy(slot)? { // Base case. StorageStrategy::Snapshot => { let state = self - .load_cold_state_as_snapshot(epoch)? - .ok_or(Error::MissingSnapshot(epoch))?; - return Ok(HDiffBuffer::from_state(state)); + .load_cold_state_as_snapshot(slot)? + .ok_or(Error::MissingSnapshot(slot))?; + let buffer = HDiffBuffer::from_state(state); + + self.diff_buffer_cache.lock().put(slot, buffer.clone()); + debug!( + self.log, + "Added diff buffer to cache"; + "load_time_ms" => t.elapsed().as_millis(), + "slot" => slot + ); + + return Ok((slot, buffer)); } // Recursive case. - StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_epoch(from)?, - StorageStrategy::Nothing => unreachable!("FIXME(sproul)"), + StorageStrategy::DiffFrom(from) => self.load_hdiff_buffer_for_slot(from)?, + StorageStrategy::ReplayFrom(from) => return self.load_hdiff_buffer_for_slot(from), }; // Load diff and apply it to buffer. - let diff = self.load_hdiff_for_epoch(epoch)?; + let diff = self.load_hdiff_for_slot(slot)?; diff.apply(&mut buffer)?; - self.diff_buffer_cache.lock().put(epoch, buffer.clone()); + self.diff_buffer_cache.lock().put(slot, buffer.clone()); debug!( self.log, "Added diff buffer to cache"; "load_time_ms" => t.elapsed().as_millis(), - "epoch" => epoch + "slot" => slot ); - Ok(buffer) + Ok((slot, buffer)) } /// Load cold blocks between `start_slot` and `end_slot` inclusive. @@ -1741,14 +1743,10 @@ impl, Cold: ItemStore> HotColdDB /// Initialise the anchor info for checkpoint sync starting from `block`. pub fn init_anchor_info(&self, block: BeaconBlockRef<'_, E>) -> Result { let anchor_slot = block.slot(); - let anchor_epoch = anchor_slot.epoch(E::slots_per_epoch()); // Set the `state_upper_limit` to the slot of the *next* checkpoint. // See `get_state_upper_limit` for rationale. - let next_snapshot_slot = self - .hierarchy - .next_snapshot_epoch(anchor_epoch)? - .start_slot(E::slots_per_epoch()); + let next_snapshot_slot = self.hierarchy.next_snapshot_slot(anchor_slot)?; let anchor_info = AnchorInfo { anchor_slot, oldest_block_slot: anchor_slot, @@ -2219,15 +2217,19 @@ pub fn migrate_database, Cold: ItemStore>( let mut cold_db_ops: Vec = Vec::new(); - if slot % E::slots_per_epoch() == 0 { + // Only store the cold state if it's on a diff boundary + if matches!( + store.hierarchy.storage_strategy(slot)?, + StorageStrategy::ReplayFrom(..) + ) { + // Store slot -> state_root and state_root -> slot mappings. + store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; + } else { let state: BeaconState = store .get_hot_state(&state_root)? .ok_or(HotColdDBError::MissingStateToFreeze(state_root))?; store.store_cold_state(&state_root, &state, &mut cold_db_ops)?; - } else { - // Store slot -> state_root and state_root -> slot mappings. - store.store_cold_state_summary(&state_root, slot, &mut cold_db_ops)?; } // There are data dependencies between calls to `store_cold_state()` that prevent us from diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 6cba35b5edf..3f151142168 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -15,6 +15,7 @@ use std::process::Command; use std::str::FromStr; use std::string::ToString; use std::time::Duration; +use store::hdiff::HierarchyConfig; use tempfile::TempDir; use types::{ Address, Checkpoint, Epoch, ExecutionBlockHash, ForkName, Hash256, MainnetEthSpec, @@ -446,6 +447,35 @@ fn eth1_cache_follow_distance_manual() { assert_eq!(config.eth1.cache_follow_distance(), 128); }); } +#[test] +fn hierarchy_exponents_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!(config.store.hierarchy_config, HierarchyConfig::default()); + }); +} +#[test] +fn hierarchy_exponents_valid() { + CommandLineTest::new() + .flag("hierarchy-exponents", Some("3,6,9,12")) + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.store.hierarchy_config, + HierarchyConfig { + exponents: vec![3, 6, 9, 12] + } + ); + }); +} +#[test] +#[should_panic] +fn hierarchy_exponents_invalid_order() { + CommandLineTest::new() + .flag("hierarchy-exponents", Some("7,6,9,12")) + .run_with_zero_port(); +} // Tests for Bellatrix flags. fn run_merge_execution_endpoints_flag_test(flag: &str) {