diff --git a/Cargo.lock b/Cargo.lock index c19ae8f0d32..a07a8efe682 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8789,6 +8789,7 @@ dependencies = [ "lighthouse_version", "lockfile", "logging", + "lru 0.7.8", "malloc_utils", "monitoring_api", "parking_lot 0.12.1", diff --git a/crypto/curdleproofs_whisk/src/lib.rs b/crypto/curdleproofs_whisk/src/lib.rs index b891ab4c399..21929d7d7d6 100644 --- a/crypto/curdleproofs_whisk/src/lib.rs +++ b/crypto/curdleproofs_whisk/src/lib.rs @@ -175,8 +175,8 @@ fn serialize_shuffle_trackers( } pub struct WhiskTrackerG1Affine { - r_g: G1Affine, - k_r_g: G1Affine, + pub r_g: G1Affine, + pub k_r_g: G1Affine, } impl TryFrom<&WhiskTracker> for WhiskTrackerG1Affine { diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index 3f32645107d..0cc81203ee7 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -64,3 +64,4 @@ malloc_utils = { path = "../common/malloc_utils" } sysinfo = "0.26.5" system_health = { path = "../common/system_health" } logging = { path = "../common/logging" } +lru = "0.7.1" diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 7371a6fbb7c..c84d597b1ab 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -10,17 +10,19 @@ mod sync; use crate::beacon_node_fallback::{BeaconNodeFallback, Errors, OfflineOnFailure, RequireSynced}; use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY}; +use crate::initialized_validators::PubkeysHash; use crate::{ block_service::BlockServiceNotification, http_metrics::metrics, validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}, }; -use curdleproofs_whisk::G1Affine; +use curdleproofs_whisk::WhiskTracker; use environment::RuntimeContext; use eth2::types::{ AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, }; use futures::{stream, StreamExt}; +use lru::LruCache; use parking_lot::RwLock; use safe_arith::ArithError; use slog::{debug, error, info, warn, Logger}; @@ -85,6 +87,7 @@ impl From for Error { #[derive(Debug)] enum PollError { InvalidProposerTracker(curdleproofs_whisk::SerializationError), + NotCorrectLength(usize), BeaconApiErrors(Errors), } @@ -98,6 +101,7 @@ impl std::fmt::Display for PollError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { PollError::InvalidProposerTracker(e) => write!(f, "InvalidProposerTracker: {}", e), + PollError::NotCorrectLength(l) => write!(f, "NotCorrectLength: {}", l), PollError::BeaconApiErrors(e) => write!(f, "BeaconApiErrors: {}", e), } } @@ -153,10 +157,16 @@ impl DutyAndProof { /// To assist with readability, the dependent root for attester/proposer duties. type DependentRoot = Hash256; +type ValidatorIndex = u64; type AttesterMap = HashMap>; type ProposerMap = HashMap)>; +/// Caches the outcome of find_tracker function, indexed by whisk tracker and validating pubkeys +/// hash. A cached value of None means no match for this set of keypairs. +pub(crate) type WhiskTrackerCache = + LruCache<(WhiskTracker, PubkeysHash), Option<(PublicKeyBytes, ValidatorIndex)>>; + /// See the module-level documentation. pub struct DutiesService { /// Maps a validator public key to their duties for each epoch. @@ -166,6 +176,8 @@ pub struct DutiesService { pub proposers: RwLock, /// Map from validator index to sync committee duties. pub sync_duties: SyncDutiesMap, + /// Cache of whisk tracker computations + pub(crate) whisk_tracker_cache: RwLock, /// Provides the canonical list of locally-managed validators. pub validator_store: Arc>, /// Tracks the current slot. @@ -1091,100 +1103,124 @@ async fn poll_beacon_proposers( false }; - let response = if is_post_whisk { - let response = poll_beacon_proposers_post_whisk(duties_service, current_slot).await; + if is_post_whisk { + let response = poll_beacon_whisk_proposer_trackers(duties_service, current_epoch).await; // Notify beacon node that validator has duties for future blocks // TODO WHISK: This routine should happen BEFORE the block to allow time to prepare - if let Ok((dependant_root, duties)) = response.as_ref() { - if !duties.is_empty() { - let preparation_data = duties - .iter() - .map(|duty| WhiskProposerPreparationData { - proposer_slot: duty.slot.into(), - validator_index: duty.validator_index, - whisk_shuffling_decision_root: *dependant_root, - }) - .collect::>(); - debug!( - log, - "Registering beacon whisk proposers with beacon node"; - "preparation_data" => ?preparation_data - ); + if let Ok((dependent_root, trackers)) = response.as_ref() { + let start_slot = current_epoch.start_slot(E::slots_per_epoch()); + + for (slot_index, tracker) in trackers.iter().enumerate() { + let slot = start_slot + Slot::new(slot_index as u64); + + // Expensive operation to check if tracker matches any of the registered validators + match find_whisk_tracker_match_cached(duties_service, tracker) { + Err(e) => { + error!(log, "Invalid tracker polled from BN"; "err" => %e) + } + Ok(None) => { + // Ok, no match found for this tracker + } + Ok(Some((pubkey, validator_index))) => { + // Found proposer for this slot + debug!( + log, + "Found matching whisk tracker proposer duties"; + "dependent_root" => %dependent_root.0, + "validator_index" => validator_index, + "slot" => slot, + ); + let relevant_duty = ProposerData { + pubkey, + slot, + validator_index, + }; + + let inserted_new_duty = insert_relevant_duties( + duties_service, + dependent_root.0, + current_epoch, + &relevant_duty, + ); + if inserted_new_duty && slot >= current_slot { + // Notify block production service if we discovered a new duty for this slot + notify_block_production_service( + current_slot, + &HashSet::from_iter(vec![pubkey]), + block_service_tx, + &duties_service.validator_store, + log, + ) + .await; + debug!( + log, + "Detected new block whisk proposer late"; + "current_slot" => current_slot, + "slot" => slot, + "proposer_index" => validator_index, + ); + metrics::inc_counter(&metrics::PROPOSER_WHISK_DUTY_NOTIFIED_LATE); + } - if let Err(e) = duties_service - .beacon_nodes - .first_success(RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| { - let preparation_data = preparation_data.clone(); - async move { - beacon_node - .post_validator_prepare_beacon_whisk_proposer(&preparation_data) - .await + // TODO WHISK: Do not await this call and interrupt the calculation of duties + if let Err(e) = duties_service + .beacon_nodes + .first_success( + RequireSynced::No, + OfflineOnFailure::Yes, + |beacon_node| async move { + beacon_node + .post_validator_prepare_beacon_whisk_proposer(&[ + WhiskProposerPreparationData { + validator_index, + proposer_slot: slot.as_u64(), + whisk_shuffling_decision_root: *dependent_root, + }, + ]) + .await + }, + ) + .await + { + error!( + log, + "Failed to prepare beacon whisk proposer"; + "err" => %e, + ) } - }) - .await - { - error!( - log, - "Failed to prepare beacon whisk proposer"; - "err" => %e, - ) + } } } } - - // TODO WHISK: Casting dependant_root here to prevent having to modify more code - response.map(|(dependant_root, duties)| (dependant_root.0, duties)) } else { - poll_beacon_proposers_before_whisk(duties_service, current_epoch).await - }; + let response = poll_beacon_proposers_before_whisk(duties_service, current_epoch).await; - match response { - Ok((dependent_root, relevant_duties)) => { - let num_relevant_duties = relevant_duties.len(); - debug!( - log, - "Downloaded proposer duties"; - "dependent_root" => %dependent_root, - "num_relevant_duties" => num_relevant_duties, - ); - - let added_new_duties = if let Some((prior_dependent_root, _)) = duties_service - .proposers - .write() - .insert(current_epoch, (dependent_root, relevant_duties)) - { - if dependent_root != prior_dependent_root { - warn!( - log, - "Proposer duties re-org"; - "prior_dependent_root" => %prior_dependent_root, - "dependent_root" => %dependent_root, - "msg" => "this may happen from time to time" + match response { + Ok((dependent_root, relevant_duties)) => { + debug!( + log, + "Downloaded proposer duties"; + "dependent_root" => %dependent_root, + "num_relevant_duties" => relevant_duties.len(), + ); + for relevant_duty in relevant_duties { + insert_relevant_duties( + duties_service, + dependent_root, + current_epoch, + &relevant_duty, ); - metrics::inc_counter(&metrics::PROPOSER_DUTIES_REORGED); - true // register re-orged duties as new - } else { - false // duties entry existed for same epoch and dependant root } - } else { - true // new unknown entry for epoch - }; - - if added_new_duties { - metrics::inc_counter_by( - &metrics::PROPOSER_DUTIES_NEW, - num_relevant_duties as u64, - ); } - } - // Don't return early here, we still want to try and produce blocks using the cached values. - Err(e) => error!( - log, - "Failed to download proposer duties"; - "err" => %e, - ), - } + // Don't return early here, we still want to try and produce blocks using the cached values. + Err(e) => error!( + log, + "Failed to download proposer duties"; + "err" => %e, + ), + }; + }; // Compute the block proposers for this slot again, now that we've received an update from // the BN. @@ -1228,6 +1264,94 @@ async fn poll_beacon_proposers( Ok(()) } +/// Find tracker owner, and cache the result in an LRU cache. +/// This function does many G1 multiplications, which cost ~200 us each. +/// For validator clients with more than hundreds of keys this operation becomes +/// really expensive. +/// +/// TODO WHISK: multithread finding trackers +fn find_whisk_tracker_match_cached( + duties_service: &DutiesService, + tracker: &WhiskTracker, +) -> Result, PollError> { + // TODO WHISK: voting pubkeys hash _could_ change during calculation find matching + // tracker, but it's unlikely so have an impact + let voting_pubkeys_hash = duties_service.validator_store.voting_pubkeys_hash(); + + // Check tracker cache first to avoid expensive operation + if let Some(value) = duties_service + .whisk_tracker_cache + .write() + .get(&(tracker.clone(), voting_pubkeys_hash)) + { + metrics::inc_counter(&metrics::PROPOSER_FIND_TRACKER_CACHE_HIT); + return Ok(*value); + } + + // Assume worst case where find need to iterate over all + + let timer = metrics::start_timer(&metrics::PROPOSER_FIND_TRACKER_MATCH); + let proposer_at_slot = duties_service + .validator_store + .find_whisk_tracker_match( + &tracker + .try_into() + .map_err(PollError::InvalidProposerTracker)?, + ) + .map(|(pubkey, validator_index)| { + ( + pubkey, + validator_index.expect("whisk validator should have index defined"), + ) + }); + drop(timer); + + // Cache computed value + duties_service + .whisk_tracker_cache + .write() + .put((tracker.clone(), voting_pubkeys_hash), proposer_at_slot); + + Ok(proposer_at_slot) +} + +/// Inserts a partial set of duties for `epoch` and `dependent_root`. +/// De-duplicates inserted duties with existing duties. +fn insert_relevant_duties( + duties_service: &DutiesService, + dependent_root: Hash256, + epoch: Epoch, + relevant_duty: &ProposerData, +) -> bool { + let mut proposers = duties_service.proposers.write(); + let (_, duties_at_epoch) = proposers + .entry(epoch) + .and_modify(|(prior_dependent_root, duties_at_epoch)| { + if &dependent_root != prior_dependent_root { + warn!( + duties_service.context.log(), + "Proposer duties re-org"; + "prior_dependent_root" => %prior_dependent_root, + "dependent_root" => %dependent_root, + "msg" => "this may happen from time to time" + ); + metrics::inc_counter(&metrics::PROPOSER_DUTIES_REORGED); + + *prior_dependent_root = dependent_root; + duties_at_epoch.clear(); + } + }) + .or_insert_with(|| (dependent_root, vec![])); + + if duties_at_epoch.contains(relevant_duty) { + false + } else { + duties_at_epoch.push(relevant_duty.clone()); + metrics::inc_counter(&metrics::PROPOSER_DUTIES_NEW); + true + } +} + /// Poll beacon proposers from `GET validator/duties/proposer/{epoch}` /// Filter relevant duties by their declared pubkey async fn poll_beacon_proposers_before_whisk( @@ -1274,10 +1398,11 @@ async fn poll_beacon_proposers_before_whisk( /// Note: Trackers are static for 8192 slots, so polling that often wastes bandwith /// for simplicity of the flow. A future optimization should cache the trackers /// query and check progressively every epoch. -async fn poll_beacon_proposers_post_whisk( +async fn poll_beacon_whisk_proposer_trackers( duties_service: &DutiesService, - current_slot: Slot, -) -> Result<(WhiskProposerShufflingRoot, Vec), PollError> { + epoch: Epoch, +) -> Result<(WhiskProposerShufflingRoot, Vec), PollError> { + let start_slot = epoch.start_slot(E::slots_per_epoch()); let response = duties_service .beacon_nodes .first_success( @@ -1289,47 +1414,26 @@ async fn poll_beacon_proposers_post_whisk( &[metrics::PROPOSER_DUTIES_HTTP_GET], ); beacon_node - .get_beacon_states_proposer_trackers(StateId::Slot(current_slot)) + .get_beacon_states_proposer_trackers(StateId::Slot(start_slot)) .await }, ) .await?; - let dependent_root = response.dependent_root; - let current_slot_index = current_slot % E::whisk_proposer_trackers_count() as u64; - let first_slot_tracker = current_slot - current_slot_index; + let start_slot_index = start_slot % E::whisk_proposer_trackers_count() as u64; - let relevant_duties = response + let trackers = response .data .into_iter() - .enumerate() - .skip(current_slot_index.as_usize()) + .skip(start_slot_index.as_usize()) .take(E::slots_per_epoch() as usize) - .filter_map(|(slot_index, tracker)| { - let r_g: G1Affine = match (&tracker.r_g).try_into() { - Ok(r_g) => r_g, - Err(e) => return Some(Err(PollError::InvalidProposerTracker(e))), - }; - let k_r_g: G1Affine = match (&tracker.k_r_g).try_into() { - Ok(k_r_g) => k_r_g, - Err(e) => return Some(Err(PollError::InvalidProposerTracker(e))), - }; + .collect::>(); - duties_service - .validator_store - .find_whisk_tracker_match(r_g, k_r_g) - .map(|(pubkey, validator_index)| { - Ok(ProposerData { - pubkey, - validator_index: validator_index - .expect("whisk validator should have index defined"), - slot: first_slot_tracker + slot_index as u64, - }) - }) - }) - .collect::, PollError>>()?; + if trackers.len() != (E::slots_per_epoch() as usize) { + return Err(PollError::NotCorrectLength(trackers.len())); + } - Ok((dependent_root, relevant_duties)) + Ok((response.dependent_root, trackers)) } /// Notify the block service if it should produce a block. diff --git a/validator_client/src/http_metrics/metrics.rs b/validator_client/src/http_metrics/metrics.rs index c0916c6f9ca..0716fc7b3ee 100644 --- a/validator_client/src/http_metrics/metrics.rs +++ b/validator_client/src/http_metrics/metrics.rs @@ -139,6 +139,18 @@ lazy_static::lazy_static! { "vc_beacon_proposer_duties_reorged_total", "Total count of proposer duties reorged, dependent root changed", ); + pub static ref PROPOSER_WHISK_DUTY_NOTIFIED_LATE: Result = try_create_int_counter( + "vc_beacon_whisk_duty_notified_late", + "Total count of times a whisk is duty is notified on or after the duty's slot", + ); + pub static ref PROPOSER_FIND_TRACKER_CACHE_HIT: Result = try_create_int_counter( + "vc_beacon_find_tracker_cache_hit_total", + "Total count of cache hits for find tracker fn", + ); + pub static ref PROPOSER_FIND_TRACKER_MATCH: Result = try_create_histogram( + "vc_beacon_find_tracker_match_times_seconds", + "Time to find tracker match in seconds", + ); /* * Endpoint metrics */ diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index b8c2b95201f..b24c9d70039 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -23,12 +23,19 @@ use lockfile::{Lockfile, LockfileError}; use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; use reqwest::{Certificate, Client, Error as ReqwestError, Identity}; use slog::{debug, error, info, warn, Logger}; -use std::collections::{HashMap, HashSet}; -use std::fs::{self, File}; use std::io::{self, Read}; +use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use std::{ + collections::{hash_map::DefaultHasher, HashMap, HashSet}, + hash::Hash, +}; +use std::{ + fs::{self, File}, + hash::Hasher, +}; use types::graffiti::GraffitiString; use types::{Address, Graffiti, Keypair, PublicKey, PublicKeyBytes}; use url::{ParseError, Url}; @@ -46,6 +53,8 @@ const DEFAULT_REMOTE_SIGNER_REQUEST_TIMEOUT: Duration = Duration::from_secs(12); // Use TTY instead of stdin to capture passwords from users. const USE_STDIN: bool = false; +pub type PubkeysHash = u64; + pub enum OnDecryptFailure { /// If the key cache fails to decrypt, create a new cache. CreateNew, @@ -486,7 +495,7 @@ pub struct InitializedValidators { /// The directory that the `self.definitions` will be saved into. validators_dir: PathBuf, /// The canonical set of validators. - validators: HashMap, + validators: TrackedHashMap, /// The clients used for communications with a remote signer. web3_signer_client_map: Option>, /// For logging via `slog`. @@ -503,7 +512,7 @@ impl InitializedValidators { let mut this = Self { validators_dir, definitions, - validators: HashMap::default(), + validators: TrackedHashMap::new(), web3_signer_client_map: None, log, }; @@ -511,6 +520,10 @@ impl InitializedValidators { Ok(this) } + pub fn voting_pubkeys_hash(&self) -> PubkeysHash { + self.validators.keys_hash + } + /// The count of enabled validators contained in `self`. pub fn num_enabled(&self) -> usize { self.validators.len() @@ -1359,3 +1372,64 @@ impl InitializedValidators { self.definitions.as_mut_slice() } } + +/// Tracks a cumulative hash of the HashMap keys. +/// All read-only methods are available through the DeRef trait. +/// Mutable methods are implemented only in demand to the `initialized_validators` use. +struct TrackedHashMap +where + K: Eq + Hash, +{ + map: HashMap, + keys_hash: u64, // Hash value to track changes +} + +impl TrackedHashMap +where + K: Eq + Hash, +{ + fn new() -> Self { + Self { + map: HashMap::new(), + keys_hash: 0, + } + } + + pub fn insert(&mut self, key: K, value: V) -> Option { + let key_hash = TrackedHashMap::::hash_key(&key); + let value = self.map.insert(key, value); + if value.is_none() { + self.keys_hash ^= key_hash; // XOR to update the hash + } + value + } + + pub fn remove(&mut self, key: &K) -> Option { + let value = self.map.remove(key); + if value.is_some() { + self.keys_hash ^= TrackedHashMap::::hash_key(key) // XOR again to revert the hash change + } + value + } + + pub fn get_mut(&mut self, k: &K) -> Option<&mut V> { + self.map.get_mut(k) + } + + fn hash_key(key: &K) -> u64 { + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + hasher.finish() + } +} + +impl Deref for TrackedHashMap +where + K: Eq + Hash, +{ + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.map + } +} diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6f071055a4a..227374fb4e6 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -42,6 +42,7 @@ use duties_service::DutiesService; use environment::RuntimeContext; use eth2::{reqwest::ClientBuilder, types::Graffiti, BeaconNodeHttpClient, StatusCode, Timeouts}; use http_api::ApiSecret; +use lru::LruCache; use notifier::spawn_notifier; use parking_lot::RwLock; use preparation_service::{PreparationService, PreparationServiceBuilder}; @@ -446,6 +447,7 @@ impl ProductionValidatorClient { attesters: <_>::default(), proposers: <_>::default(), sync_duties: <_>::default(), + whisk_tracker_cache: RwLock::new(LruCache::new(3 * T::slots_per_epoch() as usize)), slot_clock: slot_clock.clone(), beacon_nodes: beacon_nodes.clone(), validator_store: validator_store.clone(), diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index 34530a387ae..9721e17f932 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -1,12 +1,12 @@ use crate::{ doppelganger_service::DoppelgangerService, http_metrics::metrics, - initialized_validators::InitializedValidators, + initialized_validators::{InitializedValidators, PubkeysHash}, signing_method::{Error as SigningError, SignableMessage, SigningContext, SigningMethod}, Config, }; use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition}; -use curdleproofs_whisk::{bls_g1_scalar_multiply, FieldElementBytes, G1Affine}; +use curdleproofs_whisk::{bls_g1_scalar_multiply, FieldElementBytes, WhiskTrackerG1Affine}; use parking_lot::{Mutex, RwLock}; use slashing_protection::{ interchange::Interchange, InterchangeError, NotSafe, Safe, SlashingDatabase, @@ -342,6 +342,10 @@ impl ValidatorStore { self.validators.read().num_enabled() } + pub fn voting_pubkeys_hash(&self) -> PubkeysHash { + self.validators.read().voting_pubkeys_hash() + } + fn fork(&self, epoch: Epoch) -> Fork { self.spec.fork_at_epoch(epoch) } @@ -879,8 +883,7 @@ impl ValidatorStore { /// Given a whisk tracker, check if one of the registered validators matches it pub fn find_whisk_tracker_match( &self, - r_g: G1Affine, - k_r_g: G1Affine, + tracker: &WhiskTrackerG1Affine, ) -> Option<(PublicKeyBytes, Option)> { self.validators .read() @@ -889,8 +892,8 @@ impl ValidatorStore { // TODO: Should the validator track if the proposer has already submited // the first whisk proposal? For now always check the two possible k. // Note that a proposer may proposer twice in a row, so has to be fork aware - bls_g1_scalar_multiply(&r_g, proposer_k) == k_r_g || - bls_g1_scalar_multiply(&r_g, initial_proposer_k) == k_r_g) + bls_g1_scalar_multiply(&tracker.r_g, proposer_k) == tracker.k_r_g || + bls_g1_scalar_multiply(&tracker.r_g, initial_proposer_k) == tracker.k_r_g) .map(|(pubkey, index, _, _)| (*pubkey, *index)) }