diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 37a2e8917ba..d9fb3931409 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -708,11 +708,12 @@ impl, Cold: ItemStore> BackgroundMigrator( current_version: from, } .into()), + (SchemaVersion(22), SchemaVersion(23)) => { + let ops = migration_schema_v23::upgrade_to_v23::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(23), SchemaVersion(22)) => { + todo!("downgrade"); + } } } diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs new file mode 100644 index 00000000000..73364d4de12 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs @@ -0,0 +1,137 @@ +use crate::beacon_chain::BeaconChainTypes; +use crate::validator_pubkey_cache::DatabasePubkey; +use slog::{info, Logger}; +use ssz::{Decode, Encode}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; +use store::{ + get_key_for_col, hdiff::StorageStrategy, hot_cold_store::StorageStrategyHot, DBColumn, Error, + HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem, +}; +use types::{Hash256, PublicKey, Slot}; + +const LOG_EVERY: usize = 200_000; + +#[derive(Debug, Clone, Copy, Encode, Decode)] +pub struct HotStateSummaryV22 { + slot: Slot, + latest_block_root: Hash256, + epoch_boundary_state_root: Hash256, +} + +pub fn upgrade_to_v23( + db: Arc>, + log: Logger, +) -> Result, Error> { + info!(log, "Upgrading from v22 to v23"); + + // Upgrade all hot DB state summaries to the new type: + // - Set all summaries of boundary states as `Snapshot` type + // - Set all others are `Replay` pointing to `epoch_boundary_state_root` + + let mut other_ops = vec![]; + let mut store_diff_ops = vec![]; + let mut diffs_written = 0; + let mut last_log_time = Instant::now(); + + // Iterate through all pubkeys and decompress them. + for (i, res) in db + .hot_db + .iter_column::(DBColumn::BeaconStateSummary) + .enumerate() + { + let (key, value) = res?; + let state_root: Hash256 = key.into(); + let summary = HotStateSummaryV22::from_ssz_bytes(&value)?; + let storage_strategy = if summary.slot % T::EthSpec::slots_per_epoch() == 0 { + // boundary state + // - Read state from DB + // - Compute diff + // - Commit immediately in separate diff column + // - Schedule state deletion as part of atomic ops + let state = get_full_state(&self.hot_db, &state_root, &self.spec)? + .ok_or(HotColdDBError::MissingEpochBoundaryState(state_root))?; + + let storage_strategy = match self.hierarchy_hot.storage_strategy(slot)? { + StorageStrategy::ReplayFrom(from) => { + let from_root = *state + .get_state_root(from_slot) + .map_err(HotColdDBError::HotStateSummaryError)?; + StorageStrategyHot::ReplayFrom(from_root) + } + StorageStrategy::Snapshot => StorageStrategyHot::Snapshot(0), + StorageStrategy::DiffFrom(from) => { + // Compute and store diff, and delete the state + + // TOOD: Find a cleaner way to de-duplicate the code to compute diffs on + // migration + // TODO: Ensure that the other of diffs is optimal, i.e. follow topological + // order to not re-compute intermediary values + db.store_hot_state_separate_ops( + &state_root, + state, + &mut vec![], + &mut store_diff_ops, + )?; + + let from_root = *state + .get_state_root(from_slot) + .map_err(HotColdDBError::HotStateSummaryError)?; + StorageStrategyHot::DiffFrom(from_root) + } + }; + + let should_delete_state = match storage_strategy { + StorageStrategyHot::ReplayFrom(_) => true, // State no longer needed, we will replay + StorageStrategyHot::Snapshot(_) => false, // Keep state, need for snapshot + StorageStrategyHot::DiffFrom(_) => true, // State no longer needed, we will diff + }; + + if should_delete_state { + // Note: Do not delete the BeaconStateSummary as this will be over-written below + // TODO: Should also delete the temporary flag? + let state_key = + get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); + other_ops.push(KeyValueStoreOp::DeleteKey(state_key)); + } + + storage_strategy + } else { + // TODO: Clip to anchor / finalized, if not aligned + StorageStrategyHot::ReplayFrom(summary.epoch_boundary_state_root) + }; + + let new_summary = HotStateSummary { + slot: summary.slot, + latest_block_root: summary.latest_block_root, + storage_strategy, + }; + ops.push(new_summary.as_kv_store_op(key)); + + if last_log_time.elapsed() > Duration::from_secs(5) { + last_log_time = Instant::now(); + + diffs_written += store_diff_ops.len(); + // Commit diff ops once in a while to not blow up memory + // Commit diff ops before deleting states to not lose data. + db.hot_db.do_atomically(store_diff_ops)?; + store_diff_ops = vec![]; + + // TODO: Display the slot distance between head and finalized, and head-tracker count + info!( + log, + "Hot state migration in progress"; + "summaries_migrated" => i, + "diff_written" => diffs_written + ); + } + } + + // TODO: Should run hot DB compaction after deleting potentially a lot of states. Or should wait + // for the next finality event? + info!(log, "Hot state migration complete"); + + Ok(ops) +} diff --git a/beacon_node/store/src/hdiff.rs b/beacon_node/store/src/hdiff.rs index 49b29de28ea..3f1c1f90533 100644 --- a/beacon_node/store/src/hdiff.rs +++ b/beacon_node/store/src/hdiff.rs @@ -4,6 +4,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; +use std::cmp::Ordering; use std::io::{Read, Write}; use std::ops::RangeInclusive; use std::str::FromStr; @@ -20,6 +21,7 @@ pub enum Error { Compression(std::io::Error), InvalidSszState(ssz::DecodeError), InvalidBalancesLength, + LessThanStart, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)] @@ -67,6 +69,10 @@ impl FromStr for HierarchyConfig { #[derive(Debug)] pub struct HierarchyModuli { moduli: Vec, + /// Initial snapshot point, which may not be aligned to the hierarchy moduli values. Given an + /// example of exponents [5,13,21], to reconstruct state at slot 3,000,003: if start = 3,000,002 + /// layer 2 diff will point to the start snapshot instead of the layer 1 diff at 2998272. + start: Slot, } #[derive(Debug, PartialEq, Eq)] @@ -276,10 +282,10 @@ impl Default for HierarchyConfig { } impl HierarchyConfig { - pub fn to_moduli(&self) -> Result { + pub fn to_moduli(&self, start: Slot) -> Result { self.validate()?; let moduli = self.exponents.iter().map(|n| 1 << n).collect(); - Ok(HierarchyModuli { moduli }) + Ok(HierarchyModuli { moduli, start }) } pub fn validate(&self) -> Result<(), Error> { @@ -299,6 +305,12 @@ impl HierarchyConfig { impl HierarchyModuli { pub fn storage_strategy(&self, slot: Slot) -> Result { + match slot.cmp(&self.start) { + Ordering::Less => return Err(Error::LessThanStart), + Ordering::Equal => return Ok(StorageStrategy::Snapshot), + Ordering::Greater => {} // continue + } + // last = full snapshot interval let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?; // first = most frequent diff layer, need to replay blocks from this layer @@ -320,14 +332,22 @@ impl HierarchyModuli { .find_map(|(&n_big, &n_small)| { if slot % n_small == 0 { // Diff from the previous layer. - Some(StorageStrategy::DiffFrom(slot / n_big * n_big)) + let from = slot / n_big * n_big; + // Or from start point + let from = std::cmp::max(from, self.start); + Some(StorageStrategy::DiffFrom(from)) } else { // Keep trying with next layer None } }) // Exhausted layers, need to replay from most frequent layer - .unwrap_or(StorageStrategy::ReplayFrom(slot / first * first))) + .unwrap_or_else(|| { + let from = slot / first * first; + // Or from start point + let from = std::cmp::max(from, self.start); + StorageStrategy::ReplayFrom(from) + })) } /// Return the smallest slot greater than or equal to `slot` at which a full snapshot should @@ -396,7 +416,7 @@ mod tests { let config = HierarchyConfig::default(); config.validate().unwrap(); - let moduli = config.to_moduli().unwrap(); + let moduli = config.to_moduli(Slot::new(0)).unwrap(); // Full snapshots at multiples of 2^21. let snapshot_freq = Slot::new(1 << 21); @@ -432,7 +452,7 @@ mod tests { let config = HierarchyConfig::default(); config.validate().unwrap(); - let moduli = config.to_moduli().unwrap(); + let moduli = config.to_moduli(Slot::new(0)).unwrap(); let snapshot_freq = Slot::new(1 << 21); assert_eq!( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 376d521aeeb..f5f78c45327 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -62,6 +62,7 @@ pub struct HotColdDB, Cold: ItemStore> { data_column_info: RwLock, pub(crate) config: StoreConfig, pub(crate) hierarchy: HierarchyModuli, + pub hierarchy_hot: HierarchyModuli, /// Cold database containing compact historical data. pub cold_db: Cold, /// Database containing blobs. If None, store falls back to use `cold_db`. @@ -204,7 +205,9 @@ impl HotColdDB, MemoryStore> { ) -> Result, MemoryStore>, Error> { config.verify::()?; - let hierarchy = config.hierarchy_config.to_moduli()?; + let hierarchy = config.hierarchy_config.to_moduli(Slot::new(0))?; + // TODO: Use different exponents + let hierarchy_hot = config.hierarchy_config.to_moduli(Slot::new(0))?; let db = HotColdDB { split: RwLock::new(Split::default()), @@ -222,6 +225,7 @@ impl HotColdDB, MemoryStore> { )), config, hierarchy, + hierarchy_hot, spec, log, _phantom: PhantomData, @@ -247,7 +251,9 @@ impl HotColdDB, LevelDB> { ) -> Result, Error> { config.verify::()?; - let hierarchy = config.hierarchy_config.to_moduli()?; + let hierarchy = config.hierarchy_config.to_moduli(Slot::new(0))?; + // TODO: Use different exponents + let hierarchy_hot = config.hierarchy_config.to_moduli(Slot::new(0))?; let hot_db = LevelDB::open(hot_path)?; let anchor_info = RwLock::new(Self::load_anchor_info(&hot_db)?); @@ -268,6 +274,7 @@ impl HotColdDB, LevelDB> { )), config, hierarchy, + hierarchy_hot, spec, log, _phantom: PhantomData, @@ -894,14 +901,6 @@ impl, Cold: ItemStore> HotColdDB } } - pub fn put_state_summary( - &self, - state_root: &Hash256, - summary: HotStateSummary, - ) -> Result<(), Error> { - self.hot_db.put(state_root, &summary).map_err(Into::into) - } - /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { self.put_state_possibly_temporary(state_root, state, false) @@ -1241,6 +1240,9 @@ impl, Cold: ItemStore> HotColdDB ); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_temp_key)); + // TODO: Should delete the diff under this state root if any + + // TODO: Review under HDiff if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) { let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); @@ -1440,6 +1442,23 @@ impl, Cold: ItemStore> HotColdDB state_root: &Hash256, state: &BeaconState, ops: &mut Vec, + ) -> Result<(), Error> { + let mut store_diff_ops = vec![]; + self.store_hot_state_separate_ops(state_root, state, ops, &mut store_diff_ops)?; + ops.extend(store_diff_ops); + Ok(()) + } + + /// Store a post-finalization state efficiently in the hot database. + /// + /// On an epoch boundary, store a full state. On an intermediate slot, store + /// just a backpointer to the nearest epoch boundary. + pub fn store_hot_state_separate_ops( + &self, + state_root: &Hash256, + state: &BeaconState, + other_ops: &mut Vec, + store_diff_ops: &mut Vec, ) -> Result<(), Error> { // Put the state in the cache. let block_root = state.get_latest_block_root(*state_root); @@ -1461,27 +1480,75 @@ impl, Cold: ItemStore> HotColdDB return Ok(()); } - // On the epoch boundary, store the full state. - if state.slot() % E::slots_per_epoch() == 0 { - trace!( - self.log, - "Storing full state on epoch boundary"; - "slot" => state.slot().as_u64(), - "state_root" => format!("{:?}", state_root) - ); - store_full_state(state_root, state, ops)?; - } - // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. let hot_state_summary = HotStateSummary::new(state_root, state)?; - let op = hot_state_summary.as_kv_store_op(*state_root); - ops.push(op); + other_ops.push(hot_state_summary.as_kv_store_op(*state_root)); + + // On the epoch boundary, store the full state. + if state.slot() % E::slots_per_epoch() == 0 { + let slot = state.slot(); + match self.hierarchy_hot.storage_strategy(slot)? { + StorageStrategy::ReplayFrom(from_slot) => { + debug!( + self.log, + "Storing cold state"; + "strategy" => "replay", + "from_slot" => from_slot, + "slot" => slot, + ); + // Already have persisted the state summary, don't persist anything else + } + StorageStrategy::Snapshot => { + debug!( + self.log, + "Storing cold state"; + "strategy" => "snapshot", + "slot" => slot, + ); + store_full_state(state_root, state, other_ops)?; + } + StorageStrategy::DiffFrom(from_slot) => { + debug!( + self.log, + "Storing cold state"; + "strategy" => "diff", + "from_slot" => from_slot, + "slot" => slot, + ); + // TODO: Max distance in a diff layer must be less than SlotsPerHistoricalRoot + let from_root = *state + .get_state_root(from_slot) + .map_err(HotColdDBError::HotStateSummaryError)?; + + self.store_hot_state_as_diff(state_root, state, from_root, store_diff_ops)?; + } + } + } Ok(()) } + fn store_hot_state_as_diff( + &self, + state_root: &Hash256, + state: &BeaconState, + from_root: Hash256, + ops: &mut Vec, + ) -> Result<(), Error> { + let base_buffer = self.load_hot_hdiff_buffer(from_root)?; + let target_buffer = HDiffBuffer::from_state(state.clone()); + let diff = HDiff::compute(&base_buffer, &target_buffer, &self.config)?; + let diff_bytes = diff.as_ssz_bytes(); + let key = get_key_for_col( + DBColumn::BeaconStateHotDiff.into(), + &state_root.as_slice().to_vec(), + ); + ops.push(KeyValueStoreOp::PutKeyValue(key, diff_bytes)); + Ok(()) + } + /// Get a post-finalization state from the database or store. pub fn get_hot_state(&self, state_root: &Hash256) -> Result>, Error> { if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { @@ -1517,6 +1584,40 @@ impl, Cold: ItemStore> HotColdDB } } + fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result { + // TODO: Add cache of hot hdiff buffers + + let Some(HotStateSummary { + slot, + latest_block_root, + storage_strategy, + .. + }) = self.load_hot_state_summary(&state_root)? + else { + return Err(Error::MissingState(state_root)); + }; + + match storage_strategy { + StorageStrategyHot::Snapshot(_) => { + let state = get_full_state(&self.hot_db, &state_root, &self.spec)? + .ok_or(HotColdDBError::MissingEpochBoundaryState(state_root.into()))?; + let buffer = HDiffBuffer::from_state(state.clone()); + Ok(buffer) + } + StorageStrategyHot::DiffFrom(from) => { + let mut buffer = self.load_hot_hdiff_buffer(from)?; + let diff = self.load_hot_diff(state_root)?; + diff.apply(&mut buffer, &self.config)?; + Ok(buffer) + } + StorageStrategyHot::ReplayFrom(from) => self.load_hot_hdiff_buffer(from), + } + } + + fn load_hot_diff(&self, _state_root: Hash256) -> Result { + todo!(); + } + /// Load a post-finalization state from the hot database. /// /// Will replay blocks from the nearest epoch boundary. @@ -1538,9 +1639,17 @@ impl, Cold: ItemStore> HotColdDB if let Some(HotStateSummary { slot, latest_block_root, - epoch_boundary_state_root, + storage_strategy, }) = self.load_hot_state_summary(state_root)? { + match storage_strategy { + StorageStrategyHot::Snapshot | StorageStrategy::DiffFrom(_) => { + let buffer = self.load_hot_hdiff_buffer(state_root)?; + } + StorageStrategyHot::ReplayFrom(from) => { + let state = self.load_hot_state_something(state_root)?; + } + } let mut boundary_state = get_full_state(&self.hot_db, &epoch_boundary_state_root, &self.spec)?.ok_or( HotColdDBError::MissingEpochBoundaryState(epoch_boundary_state_root), @@ -1554,6 +1663,7 @@ impl, Cold: ItemStore> HotColdDB // Optimization to avoid even *thinking* about replaying blocks if we're already // on an epoch boundary. + // TODO: review this condition let mut state = if slot % E::slots_per_epoch() == 0 { boundary_state } else { @@ -1634,7 +1744,7 @@ impl, Cold: ItemStore> HotColdDB "Storing cold state"; "strategy" => "replay", "from_slot" => from, - "slot" => state.slot(), + "slot" => slot, ); // Already have persisted the state summary, don't persist anything else } @@ -1643,7 +1753,7 @@ impl, Cold: ItemStore> HotColdDB self.log, "Storing cold state"; "strategy" => "snapshot", - "slot" => state.slot(), + "slot" => slot, ); self.store_cold_state_as_snapshot(state, ops)?; } @@ -1653,7 +1763,7 @@ impl, Cold: ItemStore> HotColdDB "Storing cold state"; "strategy" => "diff", "from_slot" => from, - "slot" => state.slot(), + "slot" => slot, ); self.store_cold_state_as_diff(state, from, ops)?; } @@ -3015,6 +3125,10 @@ pub fn migrate_database, Cold: ItemStore>( let mut epoch_boundary_blocks = HashSet::new(); let mut non_checkpoint_block_roots = HashSet::new(); + // Compute the set of finalized state roots that we must keep to make the dynamic HDiff system + // work. + let required_finalized_diff_state_roots = HashSet::::new(); + // Iterate in descending order until the current split slot let state_roots = RootsIterator::new(&store, finalized_state) .take_while(|result| match result { @@ -3059,7 +3173,9 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the old summary, and the full state if we lie on an epoch boundary. - hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + if !required_finalized_diff_state_roots.contains(&state_root) { + hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + } // Do not try to store states if a restore point is yet to be stored, or will never be // stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state @@ -3209,11 +3325,20 @@ fn no_state_root_iter() -> Option, E: EthSpec>( ) -> Result>, Error> { let total_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_TIMES); + // Here we need to figure out if the state at `state_root` is stored as a diff or snapshot and + // recurse + match db.get_bytes(DBColumn::BeaconState.into(), state_root.as_slice())? { Some(bytes) => { let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_READ_OVERHEAD_TIMES); diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0498c7c1e2c..2a7b7ac6dec 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -264,6 +264,9 @@ pub enum DBColumn { /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, + /// For compact `BeaconStateDiff`'s in the hot DB + #[strum(serialize = "bhd")] + BeaconStateHotDiff, /// For beacon state snapshots in the freezer DB. #[strum(serialize = "bsn")] BeaconStateSnapshot, @@ -377,6 +380,7 @@ impl DBColumn { | Self::BeaconState | Self::BeaconBlob | Self::BeaconStateSummary + | Self::BeaconStateHotDiff | Self::BeaconColdStateSummary | Self::BeaconStateTemporary | Self::ExecPayload