Skip to content

Commit

Permalink
Merge pull request #4 from dapplion/whisk-duties-perf
Browse files Browse the repository at this point in the history
Optimize duty performance
  • Loading branch information
dapplion authored Aug 23, 2023
2 parents 50af741 + bc6319f commit cc1543a
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 127 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crypto/curdleproofs_whisk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ fn serialize_shuffle_trackers(
}

pub struct WhiskTrackerG1Affine {
r_g: G1Affine,
k_r_g: G1Affine,
pub r_g: G1Affine,
pub k_r_g: G1Affine,
}

impl TryFrom<&WhiskTracker> for WhiskTrackerG1Affine {
Expand Down
1 change: 1 addition & 0 deletions validator_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ malloc_utils = { path = "../common/malloc_utils" }
sysinfo = "0.26.5"
system_health = { path = "../common/system_health" }
logging = { path = "../common/logging" }
lru = "0.7.1"
334 changes: 219 additions & 115 deletions validator_client/src/duties_service.rs

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions validator_client/src/http_metrics/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,18 @@ lazy_static::lazy_static! {
"vc_beacon_proposer_duties_reorged_total",
"Total count of proposer duties reorged, dependent root changed",
);
pub static ref PROPOSER_WHISK_DUTY_NOTIFIED_LATE: Result<IntCounter> = try_create_int_counter(
"vc_beacon_whisk_duty_notified_late",
"Total count of times a whisk is duty is notified on or after the duty's slot",
);
pub static ref PROPOSER_FIND_TRACKER_CACHE_HIT: Result<IntCounter> = try_create_int_counter(
"vc_beacon_find_tracker_cache_hit_total",
"Total count of cache hits for find tracker fn",
);
pub static ref PROPOSER_FIND_TRACKER_MATCH: Result<Histogram> = try_create_histogram(
"vc_beacon_find_tracker_match_times_seconds",
"Time to find tracker match in seconds",
);
/*
* Endpoint metrics
*/
Expand Down
82 changes: 78 additions & 4 deletions validator_client/src/initialized_validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ use lockfile::{Lockfile, LockfileError};
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
use reqwest::{Certificate, Client, Error as ReqwestError, Identity};
use slog::{debug, error, info, warn, Logger};
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::{self, Read};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use std::{
collections::{hash_map::DefaultHasher, HashMap, HashSet},
hash::Hash,
};
use std::{
fs::{self, File},
hash::Hasher,
};
use types::graffiti::GraffitiString;
use types::{Address, Graffiti, Keypair, PublicKey, PublicKeyBytes};
use url::{ParseError, Url};
Expand All @@ -46,6 +53,8 @@ const DEFAULT_REMOTE_SIGNER_REQUEST_TIMEOUT: Duration = Duration::from_secs(12);
// Use TTY instead of stdin to capture passwords from users.
const USE_STDIN: bool = false;

pub type PubkeysHash = u64;

pub enum OnDecryptFailure {
/// If the key cache fails to decrypt, create a new cache.
CreateNew,
Expand Down Expand Up @@ -486,7 +495,7 @@ pub struct InitializedValidators {
/// The directory that the `self.definitions` will be saved into.
validators_dir: PathBuf,
/// The canonical set of validators.
validators: HashMap<PublicKeyBytes, InitializedValidator>,
validators: TrackedHashMap<PublicKeyBytes, InitializedValidator>,
/// The clients used for communications with a remote signer.
web3_signer_client_map: Option<HashMap<Web3SignerDefinition, Client>>,
/// For logging via `slog`.
Expand All @@ -503,14 +512,18 @@ impl InitializedValidators {
let mut this = Self {
validators_dir,
definitions,
validators: HashMap::default(),
validators: TrackedHashMap::new(),
web3_signer_client_map: None,
log,
};
this.update_validators().await?;
Ok(this)
}

pub fn voting_pubkeys_hash(&self) -> PubkeysHash {
self.validators.keys_hash
}

/// The count of enabled validators contained in `self`.
pub fn num_enabled(&self) -> usize {
self.validators.len()
Expand Down Expand Up @@ -1359,3 +1372,64 @@ impl InitializedValidators {
self.definitions.as_mut_slice()
}
}

/// Tracks a cumulative hash of the HashMap keys.
/// All read-only methods are available through the DeRef trait.
/// Mutable methods are implemented only in demand to the `initialized_validators` use.
struct TrackedHashMap<K, V>
where
K: Eq + Hash,
{
map: HashMap<K, V>,
keys_hash: u64, // Hash value to track changes
}

impl<K, V> TrackedHashMap<K, V>
where
K: Eq + Hash,
{
fn new() -> Self {
Self {
map: HashMap::new(),
keys_hash: 0,
}
}

pub fn insert(&mut self, key: K, value: V) -> Option<V> {
let key_hash = TrackedHashMap::<K, V>::hash_key(&key);
let value = self.map.insert(key, value);
if value.is_none() {
self.keys_hash ^= key_hash; // XOR to update the hash
}
value
}

pub fn remove(&mut self, key: &K) -> Option<V> {
let value = self.map.remove(key);
if value.is_some() {
self.keys_hash ^= TrackedHashMap::<K, V>::hash_key(key) // XOR again to revert the hash change
}
value
}

pub fn get_mut(&mut self, k: &K) -> Option<&mut V> {
self.map.get_mut(k)
}

fn hash_key(key: &K) -> u64 {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish()
}
}

impl<K, V> Deref for TrackedHashMap<K, V>
where
K: Eq + Hash,
{
type Target = HashMap<K, V>;

fn deref(&self) -> &Self::Target {
&self.map
}
}
2 changes: 2 additions & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use duties_service::DutiesService;
use environment::RuntimeContext;
use eth2::{reqwest::ClientBuilder, types::Graffiti, BeaconNodeHttpClient, StatusCode, Timeouts};
use http_api::ApiSecret;
use lru::LruCache;
use notifier::spawn_notifier;
use parking_lot::RwLock;
use preparation_service::{PreparationService, PreparationServiceBuilder};
Expand Down Expand Up @@ -446,6 +447,7 @@ impl<T: EthSpec> ProductionValidatorClient<T> {
attesters: <_>::default(),
proposers: <_>::default(),
sync_duties: <_>::default(),
whisk_tracker_cache: RwLock::new(LruCache::new(3 * T::slots_per_epoch() as usize)),
slot_clock: slot_clock.clone(),
beacon_nodes: beacon_nodes.clone(),
validator_store: validator_store.clone(),
Expand Down
15 changes: 9 additions & 6 deletions validator_client/src/validator_store.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::{
doppelganger_service::DoppelgangerService,
http_metrics::metrics,
initialized_validators::InitializedValidators,
initialized_validators::{InitializedValidators, PubkeysHash},
signing_method::{Error as SigningError, SignableMessage, SigningContext, SigningMethod},
Config,
};
use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition};
use curdleproofs_whisk::{bls_g1_scalar_multiply, FieldElementBytes, G1Affine};
use curdleproofs_whisk::{bls_g1_scalar_multiply, FieldElementBytes, WhiskTrackerG1Affine};
use parking_lot::{Mutex, RwLock};
use slashing_protection::{
interchange::Interchange, InterchangeError, NotSafe, Safe, SlashingDatabase,
Expand Down Expand Up @@ -342,6 +342,10 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
self.validators.read().num_enabled()
}

pub fn voting_pubkeys_hash(&self) -> PubkeysHash {
self.validators.read().voting_pubkeys_hash()
}

fn fork(&self, epoch: Epoch) -> Fork {
self.spec.fork_at_epoch(epoch)
}
Expand Down Expand Up @@ -879,8 +883,7 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
/// Given a whisk tracker, check if one of the registered validators matches it
pub fn find_whisk_tracker_match(
&self,
r_g: G1Affine,
k_r_g: G1Affine,
tracker: &WhiskTrackerG1Affine,
) -> Option<(PublicKeyBytes, Option<u64>)> {
self.validators
.read()
Expand All @@ -889,8 +892,8 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore<T, E> {
// TODO: Should the validator track if the proposer has already submited
// the first whisk proposal? For now always check the two possible k.
// Note that a proposer may proposer twice in a row, so has to be fork aware
bls_g1_scalar_multiply(&r_g, proposer_k) == k_r_g ||
bls_g1_scalar_multiply(&r_g, initial_proposer_k) == k_r_g)
bls_g1_scalar_multiply(&tracker.r_g, proposer_k) == tracker.k_r_g ||
bls_g1_scalar_multiply(&tracker.r_g, initial_proposer_k) == tracker.k_r_g)
.map(|(pubkey, index, _, _)| (*pubkey, *index))
}

Expand Down

0 comments on commit cc1543a

Please sign in to comment.