Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epoch single pass #13

Closed
wants to merge 13 commits into from
89 changes: 60 additions & 29 deletions beacon_node/beacon_chain/src/attestation_rewards.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::lighthouse::attestation_rewards::{IdealAttestationRewards, TotalAttestationRewards};
use eth2::lighthouse::StandardAttestationRewards;
use participation_cache::ParticipationCache;
use safe_arith::SafeArith;
use serde_utils::quoted_u64::Quoted;
use slog::debug;
use state_processing::per_epoch_processing::altair::{
process_inactivity_updates, process_justification_and_finalization,
process_inactivity_updates_slow, process_justification_and_finalization,
};
use state_processing::{
common::altair::BaseRewardPerIncrement,
per_epoch_processing::altair::{participation_cache, rewards_and_penalties::get_flag_weight},
per_epoch_processing::altair::rewards_and_penalties::get_flag_weight,
};
use std::collections::HashMap;
use store::consts::altair::{
Expand Down Expand Up @@ -134,10 +133,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let spec = &self.spec;

// Calculate ideal_rewards
let participation_cache = ParticipationCache::new(&state, spec)?;
process_justification_and_finalization(&state, &participation_cache)?
.apply_changes_to_state(&mut state);
process_inactivity_updates(&mut state, &participation_cache, spec)?;
process_justification_and_finalization(&state)?.apply_changes_to_state(&mut state);
process_inactivity_updates_slow(&mut state, spec)?;

let previous_epoch = state.previous_epoch();

Expand All @@ -147,18 +144,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let weight = get_flag_weight(flag_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;

let unslashed_participating_indices = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)?;

let unslashed_participating_balance =
unslashed_participating_indices
.total_balance()
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let unslashed_participating_balance = state
.progressive_balances_cache()
.previous_epoch_flag_attesting_balance(flag_index)?;

let unslashed_participating_increments =
unslashed_participating_balance.safe_div(spec.effective_balance_increment)?;

let total_active_balance = participation_cache.current_epoch_total_active_balance();
let total_active_balance = state.get_total_active_balance()?;

let active_increments =
total_active_balance.safe_div(spec.effective_balance_increment)?;
Expand Down Expand Up @@ -194,30 +187,50 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut total_rewards: Vec<TotalAttestationRewards> = Vec::new();

let validators = if validators.is_empty() {
participation_cache.eligible_validator_indices().to_vec()
Self::all_eligible_validator_indices(&state, previous_epoch)?
} else {
Self::validators_ids_to_indices(&mut state, validators)?
};

for validator_index in &validators {
let eligible = state.is_eligible_validator(previous_epoch, *validator_index)?;
for &validator_index in &validators {
// Return 0s for unknown/inactive validator indices. This is a bit different from stable
// where we error for unknown pubkeys.
dapplion marked this conversation as resolved.
Show resolved Hide resolved
let Ok(validator) = state.get_validator(validator_index) else {
debug!(
self.log,
"No rewards for inactive/unknown validator";
"index" => validator_index,
"epoch" => previous_epoch
);
total_rewards.push(TotalAttestationRewards {
validator_index: validator_index as u64,
head: 0,
target: 0,
source: 0,
inclusion_delay: None,
inactivity: 0,
});
continue;
};
let previous_epoch_participation_flags = state
.previous_epoch_participation()?
.get(validator_index)
.ok_or(BeaconChainError::AttestationRewardsError)?;
let eligible = state.is_eligible_validator(previous_epoch, validator)?;
let mut head_reward = 0i64;
let mut target_reward = 0i64;
let mut source_reward = 0i64;
let mut inactivity_penalty = 0i64;

if eligible {
let effective_balance = state.get_effective_balance(*validator_index)?;
let effective_balance = validator.effective_balance;

for flag_index in 0..PARTICIPATION_FLAG_WEIGHTS.len() {
let (ideal_reward, penalty) = ideal_rewards_hashmap
.get(&(flag_index, effective_balance))
.ok_or(BeaconChainError::AttestationRewardsError)?;
let voted_correctly = participation_cache
.get_unslashed_participating_indices(flag_index, previous_epoch)
.map_err(|_| BeaconChainError::AttestationRewardsError)?
.contains(*validator_index)
.map_err(|_| BeaconChainError::AttestationRewardsError)?;
let voted_correctly = !validator.slashed
&& previous_epoch_participation_flags.has_flag(flag_index)?;
if voted_correctly {
if flag_index == TIMELY_HEAD_FLAG_INDEX {
head_reward += *ideal_reward as i64;
Expand All @@ -232,10 +245,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
target_reward = *penalty;

let penalty_numerator = effective_balance
.safe_mul(state.get_inactivity_score(*validator_index)?)?;
let penalty_denominator = spec
.inactivity_score_bias
.safe_mul(spec.inactivity_penalty_quotient_for_state(&state))?;
.safe_mul(state.get_inactivity_score(validator_index)?)?;
let penalty_denominator = spec.inactivity_score_bias.safe_mul(
spec.inactivity_penalty_quotient_for_fork(state.fork_name_unchecked()),
)?;
inactivity_penalty =
-(penalty_numerator.safe_div(penalty_denominator)? as i64);
} else if flag_index == TIMELY_SOURCE_FLAG_INDEX {
Expand All @@ -244,7 +257,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
total_rewards.push(TotalAttestationRewards {
validator_index: *validator_index as u64,
validator_index: validator_index as u64,
head: head_reward,
target: target_reward,
source: source_reward,
Expand Down Expand Up @@ -301,6 +314,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(max_steps)
}

fn all_eligible_validator_indices(
state: &BeaconState<T::EthSpec>,
previous_epoch: Epoch,
) -> Result<Vec<usize>, BeaconChainError> {
state
.validators()
.iter()
.enumerate()
.filter_map(|(i, validator)| {
state
.is_eligible_validator(previous_epoch, validator)
.map(|eligible| eligible.then_some(i))
.map_err(BeaconChainError::BeaconStateError)
.transpose()
})
.collect()
}

fn validators_ids_to_indices(
state: &mut BeaconState<T::EthSpec>,
validators: Vec<ValidatorId>,
Expand Down
21 changes: 5 additions & 16 deletions beacon_node/beacon_chain/src/beacon_block_reward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use operation_pool::RewardCache;
use safe_arith::SafeArith;
use slog::error;
use state_processing::{
common::{
altair, get_attestation_participation_flag_indices, get_attesting_indices_from_state,
},
common::{get_attestation_participation_flag_indices, get_attesting_indices_from_state},
epoch_cache::initialize_epoch_cache,
per_block_processing::{
altair::sync_committee::compute_sync_aggregate_rewards, get_slashable_indices,
},
Expand All @@ -32,6 +31,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

state.build_committee_cache(RelativeEpoch::Previous, &self.spec)?;
state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;
initialize_epoch_cache(state, &self.spec)?;

self.compute_beacon_block_reward_with_cache(block, block_root, state)
}
Expand Down Expand Up @@ -191,10 +191,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: BeaconBlockRef<'_, T::EthSpec, Payload>,
state: &BeaconState<T::EthSpec>,
) -> Result<BeaconBlockSubRewardValue, BeaconChainError> {
let total_active_balance = state.get_total_active_balance()?;
let base_reward_per_increment =
altair::BaseRewardPerIncrement::new(total_active_balance, &self.spec)?;

let mut total_proposer_reward = 0;

let proposer_reward_denominator = WEIGHT_DENOMINATOR
Expand Down Expand Up @@ -235,15 +231,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&& !validator_participation.has_flag(flag_index)?
{
validator_participation.add_flag(flag_index)?;
proposer_reward_numerator.safe_add_assign(
altair::get_base_reward(
state,
index,
base_reward_per_increment,
&self.spec,
)?
.safe_mul(weight)?,
)?;
proposer_reward_numerator
.safe_add_assign(state.get_base_reward(index)?.safe_mul(weight)?)?;
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ use slot_clock::SlotClock;
use ssz::Encode;
use state_processing::{
common::get_attesting_indices_from_state,
epoch_cache::initialize_epoch_cache,
per_block_processing,
per_block_processing::{
errors::AttestationValidationError, get_expected_withdrawals,
Expand Down Expand Up @@ -3354,9 +3355,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_delay,
&state,
payload_verification_status,
self.config.progressive_balances_mode,
&self.spec,
&self.log,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
}
Expand Down Expand Up @@ -4953,6 +4952,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

// Epoch cache and total balance cache are required for op pool packing.
state.build_total_active_balance_cache_at(state.current_epoch(), &self.spec)?;
initialize_epoch_cache(&mut state, &self.spec)?;

let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,6 @@ where
store.clone(),
Some(current_slot),
&self.spec,
self.chain_config.progressive_balances_mode,
&log,
)?;
}

Expand Down
5 changes: 1 addition & 4 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use types::{Checkpoint, Epoch, ProgressiveBalancesMode};
use types::{Checkpoint, Epoch};

pub const DEFAULT_RE_ORG_THRESHOLD: ReOrgThreshold = ReOrgThreshold(20);
pub const DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION: Epoch = Epoch::new(2);
Expand Down Expand Up @@ -79,8 +79,6 @@ pub struct ChainConfig {
///
/// This is useful for block builders and testing.
pub always_prepare_payload: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer.
pub epochs_per_migration: u64,
/// When set to true Light client server computes and caches state proofs for serving updates
Expand Down Expand Up @@ -114,7 +112,6 @@ impl Default for ChainConfig {
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false,
always_prepare_payload: false,
progressive_balances_mode: ProgressiveBalancesMode::Fast,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
enable_light_client_server: false,
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub enum BeaconChainError {
SlotClockDidNotStart,
NoStateForSlot(Slot),
BeaconStateError(BeaconStateError),
EpochCacheError(EpochCacheError),
DBInconsistent(String),
DBError(store::Error),
ForkChoiceError(ForkChoiceError),
Expand Down Expand Up @@ -250,6 +251,7 @@ easy_from_to!(StateAdvanceError, BeaconChainError);
easy_from_to!(BlockReplayError, BeaconChainError);
easy_from_to!(InconsistentFork, BeaconChainError);
easy_from_to!(AvailabilityCheckError, BeaconChainError);
easy_from_to!(EpochCacheError, BeaconChainError);

#[derive(Debug)]
pub enum BlockProductionError {
Expand All @@ -258,6 +260,7 @@ pub enum BlockProductionError {
UnableToProduceAtSlot(Slot),
SlotProcessingError(SlotProcessingError),
BlockProcessingError(BlockProcessingError),
EpochCacheError(EpochCacheError),
ForkChoiceError(ForkChoiceError),
Eth1ChainError(Eth1ChainError),
BeaconStateError(BeaconStateError),
Expand Down Expand Up @@ -297,3 +300,4 @@ easy_from_to!(SlotProcessingError, BlockProductionError);
easy_from_to!(Eth1ChainError, BlockProductionError);
easy_from_to!(StateAdvanceError, BlockProductionError);
easy_from_to!(ForkChoiceError, BlockProductionError);
easy_from_to!(EpochCacheError, BlockProductionError);
9 changes: 1 addition & 8 deletions beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use state_processing::{
use std::sync::Arc;
use std::time::Duration;
use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore};
use types::{
BeaconState, ChainSpec, EthSpec, ForkName, Hash256, ProgressiveBalancesMode, SignedBeaconBlock,
Slot,
};
use types::{BeaconState, ChainSpec, EthSpec, ForkName, Hash256, SignedBeaconBlock, Slot};

const CORRUPT_DB_MESSAGE: &str = "The database could be corrupt. Check its file permissions or \
consider deleting it by running with the --purge-db flag.";
Expand Down Expand Up @@ -103,8 +100,6 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
store: Arc<HotColdDB<E, Hot, Cold>>,
current_slot: Option<Slot>,
spec: &ChainSpec,
progressive_balances_mode: ProgressiveBalancesMode,
log: &Logger,
) -> Result<ForkChoice<BeaconForkChoiceStore<E, Hot, Cold>, E>, String> {
// Fetch finalized block.
let finalized_checkpoint = head_state.finalized_checkpoint();
Expand Down Expand Up @@ -202,9 +197,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
Duration::from_secs(0),
&state,
payload_verification_status,
progressive_balances_mode,
spec,
log,
)
.map_err(|e| format!("Error applying replayed block to fork choice: {:?}", e))?;
}
Expand Down
2 changes: 0 additions & 2 deletions beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,7 @@ async fn invalid_parent() {
Duration::from_secs(0),
&state,
PayloadVerificationStatus::Optimistic,
rig.harness.chain.config.progressive_balances_mode,
&rig.harness.chain.spec,
rig.harness.logger()
),
Err(ForkChoiceError::ProtoArrayStringError(message))
if message.contains(&format!(
Expand Down
8 changes: 2 additions & 6 deletions beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use beacon_chain::{
};
use lazy_static::lazy_static;
use operation_pool::PersistedOperationPool;
use state_processing::{
per_slot_processing, per_slot_processing::Error as SlotProcessingError, EpochProcessingError,
};
use state_processing::{per_slot_processing, per_slot_processing::Error as SlotProcessingError};
use types::{
BeaconState, BeaconStateError, EthSpec, Hash256, Keypair, MinimalEthSpec, RelativeEpoch, Slot,
};
Expand Down Expand Up @@ -59,9 +57,7 @@ fn massive_skips() {
assert!(state.slot() > 1, "the state should skip at least one slot");
assert_eq!(
error,
SlotProcessingError::EpochProcessingError(EpochProcessingError::BeaconStateError(
BeaconStateError::InsufficientValidators
)),
SlotProcessingError::BeaconStateError(BeaconStateError::InsufficientValidators),
"should return error indicating that validators have been slashed out"
)
}
Expand Down
10 changes: 6 additions & 4 deletions beacon_node/genesis/src/interop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,14 @@ mod test {
}

for v in state.validators() {
let creds = v.withdrawal_credentials.as_bytes();
let creds = v.withdrawal_credentials;
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
creds.as_bytes()[0],
spec.bls_withdrawal_prefix_byte,
"first byte of withdrawal creds should be bls prefix"
);
assert_eq!(
&creds[1..],
&creds.as_bytes()[1..],
&hash(&v.pubkey.as_ssz_bytes())[1..],
"rest of withdrawal creds should be pubkey hash"
)
Expand Down Expand Up @@ -240,7 +241,8 @@ mod test {
}

for (index, v) in state.validators().iter().enumerate() {
let creds = v.withdrawal_credentials.as_bytes();
let withdrawal_credientials = v.withdrawal_credentials;
let creds = withdrawal_credientials.as_bytes();
if index % 2 == 0 {
assert_eq!(
creds[0], spec.bls_withdrawal_prefix_byte,
Expand Down
Loading
Loading