Skip to content

Commit

Permalink
Remove checkpoint alignment requirements and enable historic state pr…
Browse files Browse the repository at this point in the history
…uning (sigp#4610)

Closes sigp#3210
Closes sigp#3211

- Checkpoint sync from the latest finalized state regardless of its alignment.
- Add the `block_root` to the database's split point. This is _only_ added to the in-memory split in order to avoid a schema migration. See `load_split`.
- Add a new method to the DB called `get_advanced_state`, which looks up a state _by block root_, with a `state_root` as fallback. Using this method prevents accidental accesses of the split's unadvanced state, which does not exist in the hot DB and is not guaranteed to exist in the freezer DB at all. Previously Lighthouse would look up this state _from the freezer DB_, even if it was required for block/attestation processing, which was suboptimal.
- Replace several state look-ups in block and attestation processing with `get_advanced_state` so that they can't hit the split block's unadvanced state.
- Do not store any states in the freezer database by default. All states will be deleted upon being evicted from the hot database unless `--reconstruct-historic-states` is set. The anchor info which was previously used for checkpoint sync is used to implement this, including when syncing from genesis.

Needs further testing. I want to stress-test the pruned database under Hydra.

The `get_advanced_state` method is intended to become more relevant over time: `tree-states` includes an identically named method that returns advanced states from its in-memory cache.

Co-authored-by: realbigsean <seananderson33@gmail.com>
  • Loading branch information
2 people authored and Woodpile37 committed Jan 6, 2024
1 parent 3e289d6 commit a9644f4
Show file tree
Hide file tree
Showing 22 changed files with 190 additions and 147 deletions.
13 changes: 8 additions & 5 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4305,6 +4305,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.log,
"Produced block on state";
"block_size" => block_size,
"slot" => block.slot(),
);

metrics::observe(&metrics::BLOCK_SIZE, block_size as f64);
Expand Down Expand Up @@ -5178,14 +5179,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
(state, state_root)
} else {
let state_root = head_block.state_root;
let state = self
let block_state_root = head_block.state_root;
let max_slot = shuffling_epoch.start_slot(T::EthSpec::slots_per_epoch());
let (state_root, state) = self
.store
.get_inconsistent_state_for_attestation_verification_only(
&state_root,
Some(head_block.slot),
&head_block_root,
max_slot,
block_state_root,
)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
.ok_or(Error::MissingBeaconState(block_state_root))?;
(state, state_root)
};

Expand Down
12 changes: 10 additions & 2 deletions beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,17 @@ where
.deconstruct()
.0;

let state = self
let max_slot = self
.justified_checkpoint
.epoch
.start_slot(E::slots_per_epoch());
let (_, state) = self
.store
.get_state(&justified_block.state_root(), Some(justified_block.slot()))
.get_advanced_hot_state(
self.justified_checkpoint.root,
max_slot,
justified_block.state_root(),
)
.map_err(Error::FailedToReadState)?
.ok_or_else(|| Error::MissingState(justified_block.state_root()))?;

Expand Down
17 changes: 11 additions & 6 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

// Perform a sanity check on the pre-state.
let parent_slot = parent.beacon_block.slot();
if state.slot() < parent_slot || state.slot() > parent_slot + 1 {
if state.slot() < parent_slot || state.slot() > block.slot() {
return Err(BeaconChainError::BadPreState {
parent_root: parent.beacon_block_root,
parent_slot,
Expand Down Expand Up @@ -1844,13 +1844,18 @@ fn load_parent<T: BeaconChainTypes>(
BlockError::from(BeaconChainError::MissingBeaconBlock(block.parent_root()))
})?;

// Load the parent blocks state from the database, returning an error if it is not found.
// Load the parent block's state from the database, returning an error if it is not found.
// It is an error because if we know the parent block we should also know the parent state.
let parent_state_root = parent_block.state_root();
let parent_state = chain
.get_state(&parent_state_root, Some(parent_block.slot()))?
// Retrieve any state that is advanced through to at most `block.slot()`: this is
// particularly important if `block` descends from the finalized/split block, but at a slot
// prior to the finalized slot (which is invalid and inaccessible in our DB schema).
let (parent_state_root, parent_state) = chain
.store
.get_advanced_hot_state(root, block.slot(), parent_block.state_root())?
.ok_or_else(|| {
BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root))
BeaconChainError::DBInconsistent(
format!("Missing state for parent block {root:?}",),
)
})?;

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES);
Expand Down
86 changes: 49 additions & 37 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::RwLock;
use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
use slasher::Slasher;
use slog::{crit, error, info, Logger};
use slog::{crit, debug, error, info, Logger};
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::per_slot_processing;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -291,7 +292,7 @@ where
let genesis_state = store
.get_state(&genesis_block.state_root(), Some(genesis_block.slot()))
.map_err(|e| descriptive_db_error("genesis state", &e))?
.ok_or("Genesis block not found in store")?;
.ok_or("Genesis state not found in store")?;

self.genesis_time = Some(genesis_state.genesis_time());

Expand Down Expand Up @@ -386,6 +387,16 @@ where
let (genesis, updated_builder) = self.set_genesis_state(beacon_state)?;
self = updated_builder;

// Stage the database's metadata fields for atomic storage when `build` is called.
// Since v4.4.0 we will set the anchor with a dummy state upper limit in order to prevent
// historic states from being retained (unless `--reconstruct-historic-states` is set).
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(
store
.init_anchor_info(genesis.beacon_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize genesis anchor: {:?}", e))?,
);

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;
let current_slot = None;
Expand All @@ -412,46 +423,48 @@ where
weak_subj_block: SignedBeaconBlock<TEthSpec>,
genesis_state: BeaconState<TEthSpec>,
) -> Result<Self, String> {
let store = self.store.clone().ok_or("genesis_state requires a store")?;

let weak_subj_slot = weak_subj_state.slot();
let weak_subj_block_root = weak_subj_block.canonical_root();
let weak_subj_state_root = weak_subj_block.state_root();

// Check that the given block lies on an epoch boundary. Due to the database only storing
// full states on epoch boundaries and at restore points it would be difficult to support
// starting from a mid-epoch state.
if weak_subj_slot % TEthSpec::slots_per_epoch() != 0 {
return Err(format!(
"Checkpoint block at slot {} is not aligned to epoch start. \
Please supply an aligned checkpoint with block.slot % 32 == 0",
weak_subj_block.slot(),
));
}
let store = self
.store
.clone()
.ok_or("weak_subjectivity_state requires a store")?;
let log = self
.log
.as_ref()
.ok_or("weak_subjectivity_state requires a log")?;

// Check that the block and state have consistent slots and state roots.
if weak_subj_state.slot() != weak_subj_block.slot() {
return Err(format!(
"Slot of snapshot block ({}) does not match snapshot state ({})",
weak_subj_block.slot(),
weak_subj_state.slot(),
));
// Ensure the state is advanced to an epoch boundary.
let slots_per_epoch = TEthSpec::slots_per_epoch();
if weak_subj_state.slot() % slots_per_epoch != 0 {
debug!(
log,
"Advancing checkpoint state to boundary";
"state_slot" => weak_subj_state.slot(),
"block_slot" => weak_subj_block.slot(),
);
while weak_subj_state.slot() % slots_per_epoch != 0 {
per_slot_processing(&mut weak_subj_state, None, &self.spec)
.map_err(|e| format!("Error advancing state: {e:?}"))?;
}
}

// Prime all caches before storing the state in the database and computing the tree hash
// root.
weak_subj_state
.build_caches(&self.spec)
.map_err(|e| format!("Error building caches on checkpoint state: {e:?}"))?;

let computed_state_root = weak_subj_state
let weak_subj_state_root = weak_subj_state
.update_tree_hash_cache()
.map_err(|e| format!("Error computing checkpoint state root: {:?}", e))?;

if weak_subj_state_root != computed_state_root {
let weak_subj_slot = weak_subj_state.slot();
let weak_subj_block_root = weak_subj_block.canonical_root();

// Validate the state's `latest_block_header` against the checkpoint block.
let state_latest_block_root = weak_subj_state.get_latest_block_root(weak_subj_state_root);
if weak_subj_block_root != state_latest_block_root {
return Err(format!(
"Snapshot state root does not match block, expected: {:?}, got: {:?}",
weak_subj_state_root, computed_state_root
"Snapshot state's most recent block root does not match block, expected: {:?}, got: {:?}",
weak_subj_block_root, state_latest_block_root
));
}

Expand All @@ -468,7 +481,7 @@ where

// Set the store's split point *before* storing genesis so that genesis is stored
// immediately in the freezer DB.
store.set_split(weak_subj_slot, weak_subj_state_root);
store.set_split(weak_subj_slot, weak_subj_state_root, weak_subj_block_root);
let (_, updated_builder) = self.set_genesis_state(genesis_state)?;
self = updated_builder;

Expand All @@ -484,10 +497,11 @@ where
// Stage the database's metadata fields for atomic storage when `build` is called.
// This prevents the database from restarting in an inconsistent state if the anchor
// info or split point is written before the `PersistedBeaconChain`.
let retain_historic_states = self.chain_config.reconstruct_historic_states;
self.pending_io_batch.push(store.store_split_in_batch());
self.pending_io_batch.push(
store
.init_anchor_info(weak_subj_block.message())
.init_anchor_info(weak_subj_block.message(), retain_historic_states)
.map_err(|e| format!("Failed to initialize anchor info: {:?}", e))?,
);

Expand All @@ -507,13 +521,12 @@ where
let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &snapshot)
.map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?;

let current_slot = Some(snapshot.beacon_block.slot());
let fork_choice = ForkChoice::from_anchor(
fc_store,
snapshot.beacon_block_root,
&snapshot.beacon_block,
&snapshot.beacon_state,
current_slot,
Some(weak_subj_slot),
&self.spec,
)
.map_err(|e| format!("Unable to initialize ForkChoice: {:?}", e))?;
Expand Down Expand Up @@ -689,9 +702,8 @@ where
Err(e) => return Err(descriptive_db_error("head block", &e)),
};

let head_state_root = head_block.state_root();
let head_state = store
.get_state(&head_state_root, Some(head_block.slot()))
let (_head_state_root, head_state) = store
.get_advanced_hot_state(head_block_root, current_slot, head_block.state_root())
.map_err(|e| descriptive_db_error("head state", &e))?
.ok_or("Head state not found in store")?;

Expand Down
23 changes: 14 additions & 9 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use crate::{
};
use eth2::types::{EventKind, SseChainReorg, SseFinalizedCheckpoint, SseHead, SseLateHead};
use fork_choice::{
ExecutionStatus, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock, ResetPayloadStatuses,
ExecutionStatus, ForkChoiceStore, ForkChoiceView, ForkchoiceUpdateParameters, ProtoBlock,
ResetPayloadStatuses,
};
use itertools::process_results;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -298,10 +299,10 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
let beacon_block = store
.get_full_block(&beacon_block_root)?
.ok_or(Error::MissingBeaconBlock(beacon_block_root))?;
let beacon_state_root = beacon_block.state_root();
let beacon_state = store
.get_state(&beacon_state_root, Some(beacon_block.slot()))?
.ok_or(Error::MissingBeaconState(beacon_state_root))?;
let current_slot = fork_choice.fc_store().get_current_slot();
let (_, beacon_state) = store
.get_advanced_hot_state(beacon_block_root, current_slot, beacon_block.state_root())?
.ok_or(Error::MissingBeaconState(beacon_block.state_root()))?;

let snapshot = BeaconSnapshot {
beacon_block_root,
Expand Down Expand Up @@ -669,10 +670,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.get_full_block(&new_view.head_block_root)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;

let beacon_state_root = beacon_block.state_root();
let beacon_state: BeaconState<T::EthSpec> = self
.get_state(&beacon_state_root, Some(beacon_block.slot()))?
.ok_or(Error::MissingBeaconState(beacon_state_root))?;
let (_, beacon_state) = self
.store
.get_advanced_hot_state(
new_view.head_block_root,
current_slot,
beacon_block.state_root(),
)?
.ok_or(Error::MissingBeaconState(beacon_block.state_root()))?;

Ok(BeaconSnapshot {
beacon_block: Arc::new(beacon_block),
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
debug!(log, "Database consolidation started");

let finalized_state_root = notif.finalized_state_root;
let finalized_block_root = notif.finalized_checkpoint.root;

let finalized_state = match db.get_state(&finalized_state_root.into(), None) {
Ok(Some(state)) => state,
Expand Down Expand Up @@ -342,7 +343,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
};

match migrate_database(db.clone(), finalized_state_root.into(), &finalized_state) {
match migrate_database(
db.clone(),
finalized_state_root.into(),
finalized_block_root,
&finalized_state,
) {
Ok(()) => {}
Err(Error::HotColdDBError(HotColdDBError::FreezeSlotUnaligned(slot))) => {
debug!(
Expand Down
10 changes: 9 additions & 1 deletion beacon_node/beacon_chain/tests/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use beacon_chain::{
test_utils::{
test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
},
BeaconChain, BeaconChainError, BeaconChainTypes, WhenSlotSkipped,
BeaconChain, BeaconChainError, BeaconChainTypes, ChainConfig, WhenSlotSkipped,
};
use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
Expand Down Expand Up @@ -47,6 +47,10 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessTyp

let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec)
.chain_config(ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
})
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_ephemeral_store()
.mock_execution_layer()
Expand Down Expand Up @@ -79,6 +83,10 @@ fn get_harness_capella_spec(

let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec.clone())
.chain_config(ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
})
.keypairs(validator_keypairs)
.withdrawal_keypairs(
KEYPAIRS[0..validator_count]
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use beacon_chain::otb_verification_service::{
use beacon_chain::{
canonical_head::{CachedHead, CanonicalHead},
test_utils::{BeaconChainHarness, EphemeralHarnessType},
BeaconChainError, BlockError, ExecutionPayloadError, NotifyExecutionLayer,
BeaconChainError, BlockError, ChainConfig, ExecutionPayloadError, NotifyExecutionLayer,
OverrideForkchoiceUpdate, StateSkipConfig, WhenSlotSkipped,
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
Expand Down Expand Up @@ -59,6 +59,10 @@ impl InvalidPayloadRig {

let harness = BeaconChainHarness::builder(MainnetEthSpec)
.spec(spec)
.chain_config(ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
})
.logger(test_logger())
.deterministic_keypairs(VALIDATOR_COUNT)
.mock_execution_layer()
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use beacon_chain::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
OP_POOL_DB_KEY,
},
BeaconChain, NotifyExecutionLayer, StateSkipConfig, WhenSlotSkipped,
BeaconChain, ChainConfig, NotifyExecutionLayer, StateSkipConfig, WhenSlotSkipped,
};
use lazy_static::lazy_static;
use operation_pool::PersistedOperationPool;
Expand All @@ -28,6 +28,10 @@ lazy_static! {
fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<MinimalEthSpec>> {
let harness = BeaconChainHarness::builder(MinimalEthSpec)
.default_spec()
.chain_config(ChainConfig {
reconstruct_historic_states: true,
..ChainConfig::default()
})
.keypairs(KEYPAIRS[0..validator_count].to_vec())
.fresh_ephemeral_store()
.mock_execution_layer()
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ edition = "2021"

[dev-dependencies]
serde_yaml = "0.8.13"
state_processing = { path = "../../consensus/state_processing" }
operation_pool = { path = "../operation_pool" }
tokio = "1.14.0"

[dependencies]
state_processing = { path = "../../consensus/state_processing" }
beacon_chain = { path = "../beacon_chain" }
store = { path = "../store" }
network = { path = "../network" }
Expand Down
Loading

0 comments on commit a9644f4

Please sign in to comment.