Skip to content

Commit

Permalink
Single-pass epoch processing and optimised block processing (#5279)
Browse files Browse the repository at this point in the history
* Single-pass epoch processing (#4483, #4573)

Co-authored-by: Michael Sproul <michael@sigmaprime.io>

* Delete unused epoch processing code (#5170)

* Delete unused epoch processing code

* Compare total deltas

* Remove unnecessary apply_pending

* cargo fmt

* Remove newline

* Use epoch cache in block packing (#5223)

* Remove progressive balances mode (#5224)

* inline inactivity_penalty_quotient_for_state

* drop previous_epoch_total_active_balance

* fc lint

* spec compliant process_sync_aggregate (#15)

* spec compliant process_sync_aggregate

* Update consensus/state_processing/src/per_block_processing/altair/sync_committee.rs

Co-authored-by: Michael Sproul <micsproul@gmail.com>

---------

Co-authored-by: Michael Sproul <micsproul@gmail.com>

* Delete the participation cache (#16)

* update help

* Fix op_pool tests

* Fix fork choice tests

* Merge remote-tracking branch 'sigp/unstable' into epoch-single-pass

* Simplify exit cache (#5280)

* Fix clippy on exit cache

* Clean up single-pass a bit (#5282)

* Address Mark's review of single-pass (#5386)

* Merge remote-tracking branch 'origin/unstable' into epoch-single-pass

* Address Sean's review comments (#5414)

* Address most of Sean's review comments

* Simplify total balance cache building

* Clean up unused junk

* Merge remote-tracking branch 'origin/unstable' into epoch-single-pass

* More self-review

* Merge remote-tracking branch 'origin/unstable' into epoch-single-pass

* Merge branch 'unstable' into epoch-single-pass

* Fix imports for beta compiler

* Fix tests, probably
  • Loading branch information
michaelsproul authored Apr 4, 2024
1 parent f4cdcea commit feb531f
Show file tree
Hide file tree
Showing 81 changed files with 2,549 additions and 1,320 deletions.
132 changes: 83 additions & 49 deletions beacon_node/beacon_chain/src/attestation_rewards.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,14 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards};
use eth2::lighthouse::StandardAttestationRewards;
use participation_cache::ParticipationCache;
use eth2::types::ValidatorId;
use safe_arith::SafeArith;
use serde_utils::quoted_u64::Quoted;
use slog::debug;
use state_processing::common::base::{self, SqrtTotalActiveBalance};
use state_processing::per_epoch_processing::altair::{
process_inactivity_updates, process_justification_and_finalization,
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
};
use std::collections::HashMap;
use store::consts::altair::{
PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX,
TIMELY_TARGET_FLAG_INDEX,
process_inactivity_updates_slow, process_justification_and_finalization,
};
use types::consts::altair::WEIGHT_DENOMINATOR;

use types::{BeaconState, Epoch, EthSpec};

use eth2::types::ValidatorId;
use state_processing::common::base::get_base_reward_from_effective_balance;
use state_processing::per_epoch_processing::base::rewards_and_penalties::{
get_attestation_component_delta, get_attestation_deltas_all, get_attestation_deltas_subset,
get_inactivity_penalty_delta, get_inclusion_delay_delta,
Expand All @@ -32,6 +18,19 @@ use state_processing::per_epoch_processing::base::{
process_justification_and_finalization as process_justification_and_finalization_base,
TotalBalances, ValidatorStatus, ValidatorStatuses,
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
common::update_progressive_balances_cache::initialize_progressive_balances_cache,
epoch_cache::initialize_epoch_cache,
per_epoch_processing::altair::rewards_and_penalties::get_flag_weight,
};
use std::collections::HashMap;
use store::consts::altair::{
PARTICIPATION_FLAG_WEIGHTS, TIMELY_HEAD_FLAG_INDEX, TIMELY_SOURCE_FLAG_INDEX,
TIMELY_TARGET_FLAG_INDEX,
};
use types::consts::altair::WEIGHT_DENOMINATOR;
use types::{BeaconState, Epoch, EthSpec, RelativeEpoch};

impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn compute_attestation_rewards(
Expand Down Expand Up @@ -134,11 +133,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<StandardAttestationRewards, BeaconChainError> {
let spec = &self.spec;

// Build required caches.
initialize_epoch_cache(&mut state, spec)?;
initialize_progressive_balances_cache(&mut state, spec)?;
state.build_exit_cache(spec)?;
state.build_committee_cache(RelativeEpoch::Previous, spec)?;
state.build_committee_cache(RelativeEpoch::Current, spec)?;

// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)?;
process_justification_and_finalization(&state, &participation_cache)?
.apply_changes_to_state(&mut state);
process_inactivity_updates(&mut state, &participation_cache, spec)?;
process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state);
process_inactivity_updates_slow(&mut state, spec)?;

let previous_epoch = state.previous_epoch();

Expand All @@ -148,18 +152,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let weight = get_flag_weight(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;

let unslashed_participating_indices = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)?;

let unslashed_participating_balance =
unslashed_participating_indices
.total_balance()
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_balance = state
.progressive_balances_cache()
.previous_epoch_flag_attesting_balance(flag_index)?;

let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;

let total_active_balance = participation_cache.current_epoch_total_active_balance();
let total_active_balance = state.get_total_active_balance()?;

let active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
Expand Down Expand Up @@ -195,30 +195,49 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut total_rewards: Vec<TotalAttestationRewards> = Vec::new();

let validators = if validators.is_empty() {
participation_cache.eligible_validator_indices().to_vec()
Self::all_eligible_validator_indices(&state, previous_epoch)?
} else {
Self::validators_ids_to_indices(&mut state, validators)?
};

for validator_index in &validators {
let eligible = state.is_eligible_validator(previous_epoch, *validator_index)?;
for &validator_index in &validators {
// Return 0s for unknown/inactive validator indices.
let Ok(validator) = state.get_validator(validator_index) else {
debug!(
self.log,
"No rewards for inactive/unknown validator";
"index" => validator_index,
"epoch" => previous_epoch
);
total_rewards.push(TotalAttestationRewards {
validator_index: validator_index as u64,
head: 0,
target: 0,
source: 0,
inclusion_delay: None,
inactivity: 0,
});
continue;
};
let previous_epoch_participation_flags = state
.previous_epoch_participation()?
.get(validator_index)
.ok_or(BeaconChainError::AttestationRewardsError)?;
let eligible = state.is_eligible_validator(previous_epoch, validator)?;
let mut head_reward = 0i64;
let mut target_reward = 0i64;
let mut source_reward = 0i64;
let mut inactivity_penalty = 0i64;

if eligible {
let effective_balance = state.get_effective_balance(*validator_index)?;
let effective_balance = validator.effective_balance;

for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() {
let (ideal_reward, penalty) = ideal_rewards_hashmap
.get(&(flag_index, effective_balance))
.ok_or(BeaconChainError::AttestationRewardsError)?;
let voted_correctly = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)
.map_err(|_| BeaconChainError::AttestationRewardsError)?
.contains(*validator_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let voted_correctly = !validator.slashed
&& previous_epoch_participation_flags.has_flag(flag_index)?;
if voted_correctly {
if flag_index == TIMELY_HEAD_FLAG_INDEX {
head_reward += *ideal_reward as i64;
Expand All @@ -233,10 +252,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
target_reward = *penalty;

let penalty_numerator = effective_balance
.safe_mul(state.get_inactivity_score(*validator_index)?)?;
let penalty_denominator = spec
.inactivity_score_bias
.safe_mul(spec.inactivity_penalty_quotient_for_state(&state))?;
.safe_mul(state.get_inactivity_score(validator_index)?)?;
let penalty_denominator = spec.inactivity_score_bias.safe_mul(
spec.inactivity_penalty_quotient_for_fork(state.fork_name_unchecked()),
)?;
inactivity_penalty =
-(penalty_numerator.safe_div(penalty_denominator)? as i64);
} else if flag_index == TIMELY_SOURCE_FLAG_INDEX {
Expand All @@ -245,7 +264,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
total_rewards.push(TotalAttestationRewards {
validator_index: *validator_index as u64,
validator_index: validator_index as u64,
head: head_reward,
target: target_reward,
source: source_reward,
Expand Down Expand Up @@ -302,6 +321,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(max_steps)
}

fn all_eligible_validator_indices(
state: &BeaconState<T::EthSpec>,
previous_epoch: Epoch,
) -> Result<Vec<usize>, BeaconChainError> {
state
.validators()
.iter()
.enumerate()
.filter_map(|(i, validator)| {
state
.is_eligible_validator(previous_epoch, validator)
.map(|eligible| eligible.then_some(i))
.map_err(BeaconChainError::BeaconStateError)
.transpose()
})
.collect()
}

fn validators_ids_to_indices(
state: &mut BeaconState<T::EthSpec>,
validators: Vec<ValidatorId>,
Expand Down Expand Up @@ -340,15 +377,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

let mut ideal_attestation_rewards_list = Vec::new();

let sqrt_total_active_balance = SqrtTotalActiveBalance::new(total_balances.current_epoch());
for effective_balance_step in 1..=self.max_effective_balance_increment_steps()? {
let effective_balance =
effective_balance_step.safe_mul(spec.effective_balance_increment)?;
let base_reward = get_base_reward_from_effective_balance::<T::EthSpec>(
effective_balance,
total_balances.current_epoch(),
spec,
)?;
let base_reward =
base::get_base_reward(effective_balance, sqrt_total_active_balance, spec)?;

// compute ideal head rewards
let head = get_attestation_component_delta(
Expand Down
21 changes: 5 additions & 16 deletions beacon_node/beacon_chain/src/beacon_block_reward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use operation_pool::RewardCache;
use safe_arith::SafeArith;
use slog::error;
use state_processing::{
common::{
altair, get_attestation_participation_flag_indices, get_attesting_indices_from_state,
},
common::{get_attestation_participation_flag_indices, get_attesting_indices_from_state},
epoch_cache::initialize_epoch_cache,
per_block_processing::{
altair::sync_committee::compute_sync_aggregate_rewards, get_slashable_indices,
},
Expand All @@ -32,6 +31,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
initialize_epoch_cache(state, &self.spec)?;

self.compute_beacon_block_reward_with_cache(block, block_root, state)
}
Expand Down Expand Up @@ -191,10 +191,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
let total_active_balance = state.get_total_active_balance()?;
let base_reward_per_increment =
altair::BaseRewardPerIncrement::new(total_active_balance, &self.spec)?;

let mut total_proposer_reward = 0;

let proposer_reward_denominator = WEIGHT_DENOMINATOR
Expand Down Expand Up @@ -235,15 +231,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&& !validator_participation.has_flag(flag_index)?
{
validator_participation.add_flag(flag_index)?;
proposer_reward_numerator.safe_add_assign(
altair::get_base_reward(
state,
index,
base_reward_per_increment,
&self.spec,
)?
.safe_mul(weight)?,
)?;
proposer_reward_numerator
.safe_add_assign(state.get_base_reward(index)?.safe_mul(weight)?)?;
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
common::get_attesting_indices_from_state,
epoch_cache::initialize_epoch_cache,
per_block_processing,
per_block_processing::{
errors::AttestationValidationError, get_expected_withdrawals,
Expand Down Expand Up @@ -3360,9 +3361,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_delay,
&state,
payload_verification_status,
self.config.progressive_balances_mode,
&self.spec,
&self.log,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
}
Expand Down Expand Up @@ -4981,6 +4980,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

// Epoch cache and total balance cache are required for op pool packing.
state.build_total_active_balance_cache(&self.spec)?;
initialize_epoch_cache(&mut state, &self.spec)?;

let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,8 +775,6 @@ where
store.clone(),
Some(current_slot),
&self.spec,
self.chain_config.progressive_balances_mode,
&log,
)?;
}

Expand Down
5 changes: 1 addition & 4 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use types::{Checkpoint, Epoch, ProgressiveBalancesMode};
use types::{Checkpoint, Epoch};

pub const DEFAULT_RE_ORG_THRESHOLD: ReOrgThreshold = ReOrgThreshold(20);
pub const DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION: Epoch = Epoch::new(2);
Expand Down Expand Up @@ -81,8 +81,6 @@ pub struct ChainConfig {
///
/// This is useful for block builders and testing.
pub always_prepare_payload: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer.
pub epochs_per_migration: u64,
/// When set to true Light client server computes and caches state proofs for serving updates
Expand Down Expand Up @@ -117,7 +115,6 @@ impl Default for ChainConfig {
snapshot_cache_size: crate::snapshot_cache::DEFAULT_SNAPSHOT_CACHE_SIZE,
genesis_backfill: false,
always_prepare_payload: false,
progressive_balances_mode: ProgressiveBalancesMode::Fast,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
enable_light_client_server: false,
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum BeaconChainError {
SlotClockDidNotStart,
NoStateForSlot(Slot),
BeaconStateError(BeaconStateError),
EpochCacheError(EpochCacheError),
DBInconsistent(String),
DBError(store::Error),
ForkChoiceError(ForkChoiceError),
Expand Down Expand Up @@ -250,6 +251,7 @@ easy_from_to!(StateAdvanceError, BeaconChainError);
easy_from_to!(BlockReplayError, BeaconChainError);
easy_from_to!(InconsistentFork, BeaconChainError);
easy_from_to!(AvailabilityCheckError, BeaconChainError);
easy_from_to!(EpochCacheError, BeaconChainError);
easy_from_to!(LightClientError, BeaconChainError);

#[derive(Debug)]
Expand All @@ -259,6 +261,7 @@ pub enum BlockProductionError {
UnableToProduceAtSlot(Slot),
SlotProcessingError(SlotProcessingError),
BlockProcessingError(BlockProcessingError),
EpochCacheError(EpochCacheError),
ForkChoiceError(ForkChoiceError),
Eth1ChainError(Eth1ChainError),
BeaconStateError(BeaconStateError),
Expand Down Expand Up @@ -298,3 +301,4 @@ easy_from_to!(SlotProcessingError, BlockProductionError);
easy_from_to!(Eth1ChainError, BlockProductionError);
easy_from_to!(StateAdvanceError, BlockProductionError);
easy_from_to!(ForkChoiceError, BlockProductionError);
easy_from_to!(EpochCacheError, BlockProductionError);
9 changes: 1 addition & 8 deletions beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use state_processing::{
use std::sync::Arc;
use std::time::Duration;
use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore};
use types::{
BeaconState, ChainSpec, EthSpec, ForkName, Hash256, ProgressiveBalancesMode, SignedBeaconBlock,
Slot,
};
use types::{BeaconState, ChainSpec, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot};

const CORRUPT_DB_MESSAGE: &str = "The database could be corrupt. Check its file permissions or \
consider deleting it by running with the --purge-db flag.";
Expand Down Expand Up @@ -103,8 +100,6 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
store: Arc<HotColdDB<E, Hot, Cold>>,
current_slot: Option<Slot>,
spec: &ChainSpec,
progressive_balances_mode: ProgressiveBalancesMode,
log: &Logger,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
let finalized_checkpoint = head_state.finalized_checkpoint();
Expand Down Expand Up @@ -202,9 +197,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
Duration::from_secs(0),
&state,
payload_verification_status,
progressive_balances_mode,
spec,
log,
)
.map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?;
}
Expand Down
Loading

0 comments on commit feb531f

Please sign in to comment.