Skip to content

Commit

Permalink
Add prepare whisk beacon proposer functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Aug 10, 2023
1 parent 6f4f634 commit c9ae042
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 64 deletions.
156 changes: 98 additions & 58 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::beacon_whisk_proposer_registry::{self, BeaconWhiskProposerRegistry};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
check_block_is_finalized_checkpoint_or_descendant, check_block_relevancy, get_block_root,
Expand Down Expand Up @@ -428,6 +429,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub eth1_finalization_cache: TimeoutRwLock<Eth1FinalizationCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Mutex<BeaconProposerCache>,
/// Tracks validators self-submissions that they will propose at a slot post-whisk
pub beacon_whisk_proposer_registry: Arc<RwLock<BeaconWhiskProposerRegistry>>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache<T>>,
/// A cache used when producing attestations.
Expand Down Expand Up @@ -3917,71 +3920,95 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(None);
}

// Compute the proposer index.
let head_epoch = cached_head.head_slot().epoch(T::EthSpec::slots_per_epoch());
let shuffling_decision_root = if head_epoch == proposal_epoch {
cached_head
.snapshot
.beacon_state
.proposer_shuffling_decision_root(proposer_head)?
} else {
proposer_head
};
let cached_proposer = self
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(shuffling_decision_root, proposal_slot);
let proposer_index = if let Some(proposer) = cached_proposer {
proposer.index as u64
} else {
if head_epoch + 2 < proposal_epoch {
warn!(
self.log,
"Skipping proposer preparation";
"msg" => "this is a non-critical issue that can happen on unhealthy nodes or \
networks.",
"proposal_epoch" => proposal_epoch,
"head_epoch" => head_epoch,
);
// TODO: Any fork before whisk, refactor to be explicity
let proposer_index = if self.spec.fork_name_at_slot::<T::EthSpec>(proposal_slot)
!= ForkName::Capella
{
// Compute the proposer index.
let head_epoch = cached_head.head_slot().epoch(T::EthSpec::slots_per_epoch());
let shuffling_decision_root = if head_epoch == proposal_epoch {
cached_head
.snapshot
.beacon_state
.proposer_shuffling_decision_root(proposer_head)?
} else {
proposer_head
};

// Don't skip the head forward more than two epochs. This avoids burdening an
// unhealthy node.
//
// Although this node might miss out on preparing for a proposal, they should still
// be able to propose. This will prioritise beacon chain health over efficient
// packing of execution blocks.
return Ok(None);
}
let cached_proposer = self
.beacon_proposer_cache
.lock()
.get_slot::<T::EthSpec>(shuffling_decision_root, proposal_slot);
if let Some(proposer) = cached_proposer {
proposer.index as u64
} else {
if head_epoch + 2 < proposal_epoch {
warn!(
self.log,
"Skipping proposer preparation";
"msg" => "this is a non-critical issue that can happen on unhealthy nodes or \
networks.",
"proposal_epoch" => proposal_epoch,
"head_epoch" => head_epoch,
);

let (proposers, decision_root, _, fork) =
compute_proposer_duties_from_head(proposal_epoch, self)?;
// Don't skip the head forward more than two epochs. This avoids burdening an
// unhealthy node.
//
// Although this node might miss out on preparing for a proposal, they should still
// be able to propose. This will prioritise beacon chain health over efficient
// packing of execution blocks.
return Ok(None);
}

let proposer_offset = (proposal_slot % T::EthSpec::slots_per_epoch()).as_usize();
let proposer = *proposers
.get(proposer_offset)
.ok_or(BeaconChainError::NoProposerForSlot(proposal_slot))?;
let (proposers, decision_root, _, fork) =
compute_proposer_duties_from_head(proposal_epoch, self)?;

self.beacon_proposer_cache.lock().insert(
proposal_epoch,
decision_root,
proposers,
fork,
)?;
let proposer_offset = (proposal_slot % T::EthSpec::slots_per_epoch()).as_usize();
let proposer = *proposers
.get(proposer_offset)
.ok_or(BeaconChainError::NoProposerForSlot(proposal_slot))?;

// It's possible that the head changes whilst computing these duties. If so, abandon
// this routine since the change of head would have also spawned another instance of
// this routine.
//
// Exit now, after updating the cache.
if decision_root != shuffling_decision_root {
warn!(
self.log,
"Head changed during proposer preparation";
);
self.beacon_proposer_cache.lock().insert(
proposal_epoch,
decision_root,
proposers,
fork,
)?;

// It's possible that the head changes whilst computing these duties. If so, abandon
// this routine since the change of head would have also spawned another instance of
// this routine.
//
// Exit now, after updating the cache.
if decision_root != shuffling_decision_root {
warn!(
self.log,
"Head changed during proposer preparation";
);
return Ok(None);
}

proposer as u64
}
} else {
let whisk_shuffling_decision_root = cached_head
.snapshot
.beacon_state
.whisk_proposer_shuffling_decision_root(proposer_head)?;

// After whisk the beacon node cannot compute the beacon proposer
// A validator voluntarily declares in advance that it is the proposer of a slot
if let Some(proposer) = self
.beacon_whisk_proposer_registry
.read()
.get_slot::<T::EthSpec>(whisk_shuffling_decision_root, proposal_slot)
{
proposer.clone() as u64
} else {
// No known proposer for this fork and slot
return Ok(None);
}

proposer as u64
};

// Get the `prev_randao` and parent block number.
Expand Down Expand Up @@ -5031,6 +5058,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.contains_block(root)
}

/// Registers a validator self-declaration that it is the proposer for a whisk slot
pub fn register_whisk_proposer(
self: &Arc<Self>,
data: &WhiskProposerPreparationData,
) -> Result<(), beacon_whisk_proposer_registry::Error> {
self.beacon_whisk_proposer_registry.write().insert(
Slot::new(data.proposer_slot),
data.whisk_shuffling_decision_root.clone(),
data.validator_index as usize,
)
}

/// Determines the beacon proposer for the next slot. If that proposer is registered in the
/// `execution_layer`, provide the `execution_layer` with the necessary information to produce
/// `PayloadAttributes` for future calls to fork choice.
Expand Down Expand Up @@ -5646,6 +5685,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// sync anyway).
self.naive_aggregation_pool.write().prune(slot);
self.block_times_cache.write().prune(slot);
self.beacon_whisk_proposer_registry.write().prune(slot);

// Don't run heavy-weight tasks during sync.
if self.best_slot() + MAX_PER_SLOT_FORK_CHOICE_DISTANCE < slot {
Expand Down
98 changes: 98 additions & 0 deletions beacon_node/beacon_chain/src/beacon_whisk_proposer_registry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! The `BeaconProposerCacheWhisk` stores the self-declarations of validators of when they will
//! propose.
//!
//! This cache is keyed by `(slot, block_root)` where `block_root` is the block root at
//! `end_slot(whisk_shuffling_period_epoch_start - 1)`. Proposer trackers are fixed during an entire
//! shuffling period, which is determinstic on whisk_shuffling_period_epoch_start.
//!
//! The information hold in this cache cannot be computed by the beacon node, so it cannot be
//! pruned. The beacon node must retain this data until its sure it cannot be used.

use std::collections::{hash_map::Entry, HashMap};
use types::{EthSpec, Slot, WhiskProposerShufflingRoot};

const RETAIN_OLD_SLOTS: Slot = Slot::new(128);
const MAX_SLOT_LOOKAHEAD: Slot = Slot::new(8192);

/// For some given slot, this contains the proposer index (`index`) and the `fork` that should be
/// used to verify their signature.
pub type Proposer = usize;

#[derive(Debug, Clone)]
pub enum Error {
ProposerAlreadyRegisteredForSlot,
SlotTooOld,
SlotTooAhead,
}

/// A cache to store validator declaration of which slot they are proposers post whisk.
///
/// See the module-level documentation for more information.
#[derive(Default)]
pub struct BeaconWhiskProposerRegistry {
registry: HashMap<Slot, HashMap<WhiskProposerShufflingRoot, Proposer>>,
pruned_slot: Slot,
}

impl BeaconWhiskProposerRegistry {
pub fn new(current_slot: Slot) -> Self {
Self {
registry: HashMap::new(),
pruned_slot: current_slot,
}
}

/// If it is cached, returns the proposer for the block at `slot` where the block has the
/// ancestor block root of `shuffling_decision_block` at `end_slot(slot.epoch() - 1)`.
pub fn get_slot<T: EthSpec>(
&self,
whisk_shuffling_decision_block: WhiskProposerShufflingRoot,
slot: Slot,
) -> Option<&Proposer> {
self.registry
.get(&slot)
.and_then(|entry| entry.get(&whisk_shuffling_decision_block))
}

/// Insert the proposers into the cache.
///
/// See `Self::get` for a description of `shuffling_decision_block`.
///
/// The `fork` value must be valid to verify proposer signatures in `epoch`.
pub fn insert(
&mut self,
slot: Slot,
whisk_shuffling_decision_block: WhiskProposerShufflingRoot,
proposer_index: Proposer,
) -> Result<(), Error> {
if slot < self.pruned_slot {
return Err(Error::SlotTooOld);
}
if slot > self.pruned_slot + RETAIN_OLD_SLOTS + MAX_SLOT_LOOKAHEAD {
return Err(Error::SlotTooAhead);
}

match self
.registry
.entry(slot)
.or_insert(HashMap::new())
.entry(whisk_shuffling_decision_block)
{
Entry::Vacant(entry) => entry.insert(proposer_index),
Entry::Occupied(_) => return Err(Error::ProposerAlreadyRegisteredForSlot),
};

Ok(())
}

/// Prune entries for slots that
pub fn prune(&mut self, current_slot: Slot) {
let up_to_slot = current_slot.saturating_sub(RETAIN_OLD_SLOTS);

for slot in self.pruned_slot.as_u64()..up_to_slot.as_u64() {
self.registry.remove(&Slot::new(slot));
}

self.pruned_slot = up_to_slot;
}
}
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ where
)),
eth1_finalization_cache: TimeoutRwLock::new(Eth1FinalizationCache::new(log.clone())),
beacon_proposer_cache: <_>::default(),
beacon_whisk_proposer_registry: <_>::default(),
block_times_cache: <_>::default(),
pre_finalization_block_cache: <_>::default(),
validator_pubkey_cache: TimeoutRwLock::new(validator_pubkey_cache),
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod beacon_chain;
mod beacon_fork_choice_store;
pub mod beacon_proposer_cache;
mod beacon_snapshot;
pub mod beacon_whisk_proposer_registry;
pub mod block_reward;
mod block_times_cache;
mod block_verification;
Expand Down
52 changes: 51 additions & 1 deletion beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use types::{
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBeaconBlock, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
SyncCommitteeMessage, SyncContributionData, WhiskProposerPreparationData,
};
use version::{
add_consensus_version_header, execution_optimistic_finalized_fork_versioned_response,
Expand Down Expand Up @@ -238,6 +238,7 @@ pub fn prometheus_metrics() -> warp::filters::log::Log<impl Fn(warp::filters::lo
.or_else(|| starts_with("v1/beacon/pool/sync_committees"))
.or_else(|| starts_with("v1/beacon/blocks/head/root"))
.or_else(|| starts_with("v1/validator/prepare_beacon_proposer"))
.or_else(|| starts_with("v1/validator/prepare_beacon_whisk_proposer"))
.or_else(|| starts_with("v1/validator/register_validator"))
.or_else(|| starts_with("v1/beacon/"))
.or_else(|| starts_with("v2/beacon/"))
Expand Down Expand Up @@ -3167,6 +3168,54 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST validator/prepare_beacon_whisk_proposer
let post_validator_prepare_beacon_whisk_proposer = eth_v1
.and(warp::path("validator"))
.and(warp::path("prepare_beacon_whisk_proposer"))
.and(warp::path::end())
.and(not_while_syncing_filter.clone())
.and(chain_filter.clone())
.and(log_filter.clone())
.and(warp::body::json())
.and_then(
|chain: Arc<BeaconChain<T>>,
log: Logger,
preparation_data: Vec<WhiskProposerPreparationData>| async move {
let current_slot = chain
.slot()
.map_err(warp_utils::reject::beacon_chain_error)?;

debug!(
log,
"Received proposer preparation data";
"count" => preparation_data.len(),
);

// Register proposer for slot with beacon chain
for data in preparation_data {
chain.register_whisk_proposer(&data).map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"error registering whisk proposer: {:?}",
e
))
})?;
}

// TODO WHISK: is this call necessary?
chain
.prepare_beacon_proposer(current_slot)
.await
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!(
"error updating proposer preparations: {:?}",
e
))
})?;

Ok::<_, warp::reject::Rejection>(warp::reply::json(&()).into_response())
},
);

// POST validator/register_validator
let post_validator_register_validator = eth_v1
.and(warp::path("validator"))
Expand Down Expand Up @@ -4000,6 +4049,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_validator_beacon_committee_subscriptions)
.uor(post_validator_sync_committee_subscriptions)
.uor(post_validator_prepare_beacon_proposer)
.uor(post_validator_prepare_beacon_whisk_proposer)
.uor(post_validator_register_validator)
.uor(post_lighthouse_liveness)
.uor(post_lighthouse_database_reconstruct)
Expand Down
Loading

0 comments on commit c9ae042

Please sign in to comment.