Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Oct 31, 2024
1 parent d4f23e0 commit aa854ec
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 42 deletions.
11 changes: 6 additions & 5 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
StoreOp::DeleteSyncCommitteeBranch(block_root),
]
})
.chain(
abandoned_states
.into_iter()
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))),
)
.chain(abandoned_states.into_iter().flat_map(|(slot, state_hash)| {
[
StoreOp::DeleteState(state_hash.into(), Some(slot)),
StoreOp::BeaconStateHotDiff(state_hash.into()),
]
}))
.collect();

// Persist the head in case the process is killed or crashes here. This prevents
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod migration_schema_v20;
mod migration_schema_v21;
mod migration_schema_v22;
mod migration_schema_v23;

use crate::beacon_chain::BeaconChainTypes;
use slog::Logger;
Expand Down Expand Up @@ -65,5 +66,12 @@ pub fn migrate_schema<T: BeaconChainTypes>(
current_version: from,
}
.into()),
(SchemaVersion(22), SchemaVersion(23)) => {
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(23), SchemaVersion(22)) => {
todo!("downgrade");
}
}
}
137 changes: 137 additions & 0 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::DatabasePubkey;
use slog::{info, Logger};
use ssz::{Decode, Encode};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use store::{
get_key_for_col, hdiff::StorageStrategy, hot_cold_store::StorageStrategyHot, DBColumn, Error,
HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem,
};
use types::{Hash256, PublicKey, Slot};

const LOG_EVERY: usize = 200_000;

#[derive(Debug, Clone, Copy, Encode, Decode)]
pub struct HotStateSummaryV22 {
slot: Slot,
latest_block_root: Hash256,
epoch_boundary_state_root: Hash256,
}

pub fn upgrade_to_v23<T: BeaconChainTypes>(
db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v22 to v23");

// Upgrade all hot DB state summaries to the new type:
// - Set all summaries of boundary states as `Snapshot` type
// - Set all others are `Replay` pointing to `epoch_boundary_state_root`

let mut other_ops = vec![];
let mut store_diff_ops = vec![];
let mut diffs_written = 0;
let mut last_log_time = Instant::now();

// Iterate through all pubkeys and decompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.enumerate()
{
let (key, value) = res?;
let state_root: Hash256 = key.into();
let summary = HotStateSummaryV22::from_ssz_bytes(&value)?;
let storage_strategy = if summary.slot % T::EthSpec::slots_per_epoch() == 0 {
// boundary state
// - Read state from DB
// - Compute diff
// - Commit immediately in separate diff column
// - Schedule state deletion as part of atomic ops
let state = get_full_state(&self.hot_db, &state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(state_root))?;

let storage_strategy = match self.hierarchy_hot.storage_strategy(slot)? {
StorageStrategy::ReplayFrom(from) => {
let from_root = *state
.get_state_root(from_slot)
.map_err(HotColdDBError::HotStateSummaryError)?;
StorageStrategyHot::ReplayFrom(from_root)
}
StorageStrategy::Snapshot => StorageStrategyHot::Snapshot(0),
StorageStrategy::DiffFrom(from) => {
// Compute and store diff, and delete the state

// TOOD: Find a cleaner way to de-duplicate the code to compute diffs on
// migration
// TODO: Ensure that the other of diffs is optimal, i.e. follow topological
// order to not re-compute intermediary values
db.store_hot_state_separate_ops(
&state_root,
state,
&mut vec![],
&mut store_diff_ops,
)?;

let from_root = *state
.get_state_root(from_slot)
.map_err(HotColdDBError::HotStateSummaryError)?;
StorageStrategyHot::DiffFrom(from_root)
}
};

let should_delete_state = match storage_strategy {
StorageStrategyHot::ReplayFrom(_) => true, // State no longer needed, we will replay
StorageStrategyHot::Snapshot(_) => false, // Keep state, need for snapshot
StorageStrategyHot::DiffFrom(_) => true, // State no longer needed, we will diff
};

if should_delete_state {
// Note: Do not delete the BeaconStateSummary as this will be over-written below
// TODO: Should also delete the temporary flag?
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
other_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}

storage_strategy
} else {
// TODO: Clip to anchor / finalized, if not aligned
StorageStrategyHot::ReplayFrom(summary.epoch_boundary_state_root)
};

let new_summary = HotStateSummary {
slot: summary.slot,
latest_block_root: summary.latest_block_root,
storage_strategy,
};
ops.push(new_summary.as_kv_store_op(key));

if last_log_time.elapsed() > Duration::from_secs(5) {
last_log_time = Instant::now();

diffs_written += store_diff_ops.len();
// Commit diff ops once in a while to not blow up memory
// Commit diff ops before deleting states to not lose data.
db.hot_db.do_atomically(store_diff_ops)?;
store_diff_ops = vec![];

// TODO: Display the slot distance between head and finalized, and head-tracker count
info!(
log,
"Hot state migration in progress";
"summaries_migrated" => i,
"diff_written" => diffs_written
);
}
}

// TODO: Should run hot DB compaction after deleting potentially a lot of states. Or should wait
// for the next finality event?
info!(log, "Hot state migration complete");

Ok(ops)
}
32 changes: 26 additions & 6 deletions beacon_node/store/src/hdiff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use std::cmp::Ordering;
use std::io::{Read, Write};
use std::ops::RangeInclusive;
use std::str::FromStr;
Expand All @@ -20,6 +21,7 @@ pub enum Error {
Compression(std::io::Error),
InvalidSszState(ssz::DecodeError),
InvalidBalancesLength,
LessThanStart,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Encode, Decode)]
Expand Down Expand Up @@ -67,6 +69,10 @@ impl FromStr for HierarchyConfig {
#[derive(Debug)]
pub struct HierarchyModuli {
moduli: Vec<u64>,
/// Initial snapshot point, which may not be aligned to the hierarchy moduli values. Given an
/// example of exponents [5,13,21], to reconstruct state at slot 3,000,003: if start = 3,000,002
/// layer 2 diff will point to the start snapshot instead of the layer 1 diff at 2998272.
start: Slot,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -276,10 +282,10 @@ impl Default for HierarchyConfig {
}

impl HierarchyConfig {
pub fn to_moduli(&self) -> Result<HierarchyModuli, Error> {
pub fn to_moduli(&self, start: Slot) -> Result<HierarchyModuli, Error> {
self.validate()?;
let moduli = self.exponents.iter().map(|n| 1 << n).collect();
Ok(HierarchyModuli { moduli })
Ok(HierarchyModuli { moduli, start })
}

pub fn validate(&self) -> Result<(), Error> {
Expand All @@ -299,6 +305,12 @@ impl HierarchyConfig {

impl HierarchyModuli {
pub fn storage_strategy(&self, slot: Slot) -> Result<StorageStrategy, Error> {
match slot.cmp(&self.start) {
Ordering::Less => return Err(Error::LessThanStart),
Ordering::Equal => return Ok(StorageStrategy::Snapshot),
Ordering::Greater => {} // continue
}

// last = full snapshot interval
let last = self.moduli.last().copied().ok_or(Error::InvalidHierarchy)?;
// first = most frequent diff layer, need to replay blocks from this layer
Expand All @@ -320,14 +332,22 @@ impl HierarchyModuli {
.find_map(|(&n_big, &n_small)| {
if slot % n_small == 0 {
// Diff from the previous layer.
Some(StorageStrategy::DiffFrom(slot / n_big * n_big))
let from = slot / n_big * n_big;
// Or from start point
let from = std::cmp::max(from, self.start);
Some(StorageStrategy::DiffFrom(from))
} else {
// Keep trying with next layer
None
}
})
// Exhausted layers, need to replay from most frequent layer
.unwrap_or(StorageStrategy::ReplayFrom(slot / first * first)))
.unwrap_or_else(|| {
let from = slot / first * first;
// Or from start point
let from = std::cmp::max(from, self.start);
StorageStrategy::ReplayFrom(from)
}))
}

/// Return the smallest slot greater than or equal to `slot` at which a full snapshot should
Expand Down Expand Up @@ -396,7 +416,7 @@ mod tests {
let config = HierarchyConfig::default();
config.validate().unwrap();

let moduli = config.to_moduli().unwrap();
let moduli = config.to_moduli(Slot::new(0)).unwrap();

// Full snapshots at multiples of 2^21.
let snapshot_freq = Slot::new(1 << 21);
Expand Down Expand Up @@ -432,7 +452,7 @@ mod tests {
let config = HierarchyConfig::default();
config.validate().unwrap();

let moduli = config.to_moduli().unwrap();
let moduli = config.to_moduli(Slot::new(0)).unwrap();
let snapshot_freq = Slot::new(1 << 21);

assert_eq!(
Expand Down
Loading

0 comments on commit aa854ec

Please sign in to comment.