diff --git a/Cargo.lock b/Cargo.lock index 5fcba6b2643..44ca67e9b47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2535,6 +2535,7 @@ version = "0.1.0" dependencies = [ "account_utils", "bytes", + "derivative", "eth2_keystore", "ethereum_serde_utils", "ethereum_ssz", diff --git a/book/src/help_vc.md b/book/src/help_vc.md index 7f2cfab8e3a..23a84919936 100644 --- a/book/src/help_vc.md +++ b/book/src/help_vc.md @@ -177,6 +177,22 @@ Options: Default is unlimited. Flags: + --beacon-nodes-sync-tolerances + A comma-separated list of 3 values which sets the size of each sync + distance range when determining the health of each connected beacon + node. The first value determines the `Synced` range. If a connected + beacon node is synced to within this number of slots it is considered + 'Synced'. The second value determines the `Small` sync distance range. + This range starts immediately after the `Synced` range. The third + value determines the `Medium` sync distance range. This range starts + immediately after the `Small` range. Any sync distance value beyond + that is considered `Large`. For example, a value of `8,8,48` would + have ranges like the following: `Synced`: 0..=8 `Small`: 9..=16 + `Medium`: 17..=64 `Large`: 65.. These values are used to determine + what ordering beacon node fallbacks are used in. Generally, `Synced` + nodes are preferred over `Small` and so on. Nodes in the `Synced` + range will tie-break based on their ordering in `--beacon-nodes`. This + ensures the primary beacon node is prioritised. [default: 8,8,48] --builder-proposals If this flag is set, Lighthouse will query the Beacon Node for only block headers during proposals and will sign over headers. Useful for diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index 10b4755ba26..d23a4068f1b 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -29,6 +29,7 @@ store = { workspace = true } slashing_protection = { workspace = true } mediatype = "0.19.13" pretty_reqwest_error = { workspace = true } +derivative = { workspace = true } [dev-dependencies] tokio = { workspace = true } diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 2805d36b90c..522c6414eae 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -16,6 +16,7 @@ pub mod types; use self::mixin::{RequestAccept, ResponseOptional}; use self::types::{Error as ResponseError, *}; +use derivative::Derivative; use futures::Stream; use futures_util::StreamExt; use lighthouse_network::PeerId; @@ -117,7 +118,7 @@ impl fmt::Display for Error { /// A struct to define a variety of different timeouts for different validator tasks to ensure /// proper fallback behaviour. -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct Timeouts { pub attestation: Duration, pub attester_duties: Duration, @@ -154,13 +155,17 @@ impl Timeouts { /// A wrapper around `reqwest::Client` which provides convenience methods for interfacing with a /// Lighthouse Beacon Node HTTP server (`http_api`). -#[derive(Clone)] +#[derive(Clone, Debug, Derivative)] +#[derivative(PartialEq)] pub struct BeaconNodeHttpClient { + #[derivative(PartialEq = "ignore")] client: reqwest::Client, server: SensitiveUrl, timeouts: Timeouts, } +impl Eq for BeaconNodeHttpClient {} + impl fmt::Display for BeaconNodeHttpClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.server.fmt(f) diff --git a/lighthouse/tests/validator_client.rs b/lighthouse/tests/validator_client.rs index cb16ca4792c..baf50aa7c07 100644 --- a/lighthouse/tests/validator_client.rs +++ b/lighthouse/tests/validator_client.rs @@ -1,4 +1,6 @@ -use validator_client::{config::DEFAULT_WEB3SIGNER_KEEP_ALIVE, ApiTopic, Config}; +use validator_client::{ + config::DEFAULT_WEB3SIGNER_KEEP_ALIVE, ApiTopic, BeaconNodeSyncDistanceTiers, Config, +}; use crate::exec::CommandLineTestExec; use bls::{Keypair, PublicKeyBytes}; @@ -12,7 +14,7 @@ use std::str::FromStr; use std::string::ToString; use std::time::Duration; use tempfile::TempDir; -use types::Address; +use types::{Address, Slot}; /// Returns the `lighthouse validator_client` command. fn base_cmd() -> Command { @@ -511,7 +513,6 @@ fn monitoring_endpoint() { assert_eq!(api_conf.update_period_secs, Some(30)); }); } - #[test] fn disable_run_on_all_flag() { CommandLineTest::new() @@ -572,6 +573,33 @@ fn broadcast_flag() { }); } +/// Tests for validator fallback flags. +#[test] +fn beacon_nodes_sync_tolerances_flag_default() { + CommandLineTest::new().run().with_config(|config| { + assert_eq!( + config.beacon_node_fallback.sync_tolerances, + BeaconNodeSyncDistanceTiers::default() + ) + }); +} +#[test] +fn beacon_nodes_sync_tolerances_flag() { + CommandLineTest::new() + .flag("beacon-nodes-sync-tolerances", Some("4,4,4")) + .run() + .with_config(|config| { + assert_eq!( + config.beacon_node_fallback.sync_tolerances, + BeaconNodeSyncDistanceTiers { + synced: Slot::new(4), + small: Slot::new(8), + medium: Slot::new(12), + } + ); + }); +} + #[test] #[should_panic(expected = "Unknown API topic")] fn wrong_broadcast_flag() { diff --git a/testing/simulator/src/fallback_sim.rs b/testing/simulator/src/fallback_sim.rs index b27a6246bf8..3859257fb75 100644 --- a/testing/simulator/src/fallback_sim.rs +++ b/testing/simulator/src/fallback_sim.rs @@ -29,7 +29,7 @@ const DENEB_FORK_EPOCH: u64 = 2; // This has potential to block CI so it should be set conservatively enough that spurious failures // don't become very common, but not so conservatively that regressions to the fallback mechanism // cannot be detected. -const ACCEPTABLE_FALLBACK_ATTESTATION_HIT_PERCENTAGE: f64 = 85.0; +const ACCEPTABLE_FALLBACK_ATTESTATION_HIT_PERCENTAGE: f64 = 95.0; const SUGGESTED_FEE_RECIPIENT: [u8; 20] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]; diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index bff40b41d5f..4c338e91b9d 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -10,7 +10,6 @@ path = "src/lib.rs" [dev-dependencies] tokio = { workspace = true } -itertools = { workspace = true } [dependencies] tree_hash = { workspace = true } @@ -60,4 +59,5 @@ sysinfo = { workspace = true } system_health = { path = "../common/system_health" } logging = { workspace = true } strum = { workspace = true } +itertools = { workspace = true } fdlimit = "0.3.0" diff --git a/validator_client/src/attestation_service.rs b/validator_client/src/attestation_service.rs index 30fe508a2c2..5363f36f665 100644 --- a/validator_client/src/attestation_service.rs +++ b/validator_client/src/attestation_service.rs @@ -1,9 +1,8 @@ -use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use crate::{ duties_service::{DutiesService, DutyAndProof}, http_metrics::metrics, validator_store::{Error as ValidatorStoreError, ValidatorStore}, - OfflineOnFailure, }; use environment::RuntimeContext; use futures::future::join_all; @@ -339,21 +338,17 @@ impl AttestationService { let attestation_data = self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::ATTESTATIONS_HTTP_GET], - ); - beacon_node - .get_validator_attestation_data(slot, committee_index) - .await - .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) - .map(|result| result.data) - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS_HTTP_GET], + ); + beacon_node + .get_validator_attestation_data(slot, committee_index) + .await + .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) + .map(|result| result.data) + }) .await .map_err(|e| e.to_string())?; @@ -458,26 +453,21 @@ impl AttestationService { // Post the attestations to the BN. match self .beacon_nodes - .request( - RequireSynced::No, - OfflineOnFailure::Yes, - ApiTopic::Attestations, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::ATTESTATIONS_HTTP_POST], - ); - if fork_name.electra_enabled() { - beacon_node - .post_beacon_pool_attestations_v2(attestations, fork_name) - .await - } else { - beacon_node - .post_beacon_pool_attestations_v1(attestations) - .await - } - }, - ) + .request(ApiTopic::Attestations, |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::ATTESTATIONS_HTTP_POST], + ); + if fork_name.electra_enabled() { + beacon_node + .post_beacon_pool_attestations_v2(attestations, fork_name) + .await + } else { + beacon_node + .post_beacon_pool_attestations_v1(attestations) + .await + } + }) .await { Ok(()) => info!( @@ -540,46 +530,38 @@ impl AttestationService { let aggregated_attestation = &self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::AGGREGATES_HTTP_GET], - ); - if fork_name.electra_enabled() { - beacon_node - .get_validator_aggregate_attestation_v2( - attestation_data.slot, - attestation_data.tree_hash_root(), - committee_index, - ) - .await - .map_err(|e| { - format!("Failed to produce an aggregate attestation: {:?}", e) - })? - .ok_or_else(|| { - format!("No aggregate available for {:?}", attestation_data) - }) - .map(|result| result.data) - } else { - beacon_node - .get_validator_aggregate_attestation_v1( - attestation_data.slot, - attestation_data.tree_hash_root(), - ) - .await - .map_err(|e| { - format!("Failed to produce an aggregate attestation: {:?}", e) - })? - .ok_or_else(|| { - format!("No aggregate available for {:?}", attestation_data) - }) - .map(|result| result.data) - } - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES_HTTP_GET], + ); + if fork_name.electra_enabled() { + beacon_node + .get_validator_aggregate_attestation_v2( + attestation_data.slot, + attestation_data.tree_hash_root(), + committee_index, + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) + .map(|result| result.data) + } else { + beacon_node + .get_validator_aggregate_attestation_v1( + attestation_data.slot, + attestation_data.tree_hash_root(), + ) + .await + .map_err(|e| { + format!("Failed to produce an aggregate attestation: {:?}", e) + })? + .ok_or_else(|| format!("No aggregate available for {:?}", attestation_data)) + .map(|result| result.data) + } + }) .await .map_err(|e| e.to_string())?; @@ -637,30 +619,26 @@ impl AttestationService { let signed_aggregate_and_proofs_slice = signed_aggregate_and_proofs.as_slice(); match self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::ATTESTATION_SERVICE_TIMES, - &[metrics::AGGREGATES_HTTP_POST], - ); - if fork_name.electra_enabled() { - beacon_node - .post_validator_aggregate_and_proof_v2( - signed_aggregate_and_proofs_slice, - fork_name, - ) - .await - } else { - beacon_node - .post_validator_aggregate_and_proof_v1( - signed_aggregate_and_proofs_slice, - ) - .await - } - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::ATTESTATION_SERVICE_TIMES, + &[metrics::AGGREGATES_HTTP_POST], + ); + if fork_name.electra_enabled() { + beacon_node + .post_validator_aggregate_and_proof_v2( + signed_aggregate_and_proofs_slice, + fork_name, + ) + .await + } else { + beacon_node + .post_validator_aggregate_and_proof_v1( + signed_aggregate_and_proofs_slice, + ) + .await + } + }) .await { Ok(()) => { diff --git a/validator_client/src/beacon_node_fallback.rs b/validator_client/src/beacon_node_fallback.rs index 6bba55d6767..e5fe419983a 100644 --- a/validator_client/src/beacon_node_fallback.rs +++ b/validator_client/src/beacon_node_fallback.rs @@ -2,14 +2,19 @@ //! "fallback" behaviour; it will try a request on all of the nodes until one or none of them //! succeed. -use crate::check_synced::check_synced; +use crate::beacon_node_health::{ + BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic, + SyncDistanceTier, +}; +use crate::check_synced::check_node_health; use crate::http_metrics::metrics::{inc_counter_vec, ENDPOINT_ERRORS, ENDPOINT_REQUESTS}; use environment::RuntimeContext; use eth2::BeaconNodeHttpClient; use futures::future; -use serde::{Deserialize, Serialize}; -use slog::{debug, error, info, warn, Logger}; +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use slog::{debug, error, warn, Logger}; use slot_clock::SlotClock; +use std::cmp::Ordering; use std::fmt; use std::fmt::Debug; use std::future::Future; @@ -18,7 +23,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use strum::{EnumString, EnumVariantNames}; use tokio::{sync::RwLock, time::sleep}; -use types::{ChainSpec, Config, EthSpec}; +use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot}; /// Message emitted when the VC detects the BN is using a different spec. const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updating"; @@ -32,6 +37,16 @@ const UPDATE_REQUIRED_LOG_HINT: &str = "this VC or the remote BN may need updati /// having the correct nodes up and running prior to the start of the slot. const SLOT_LOOKAHEAD: Duration = Duration::from_secs(2); +/// If the beacon node slot_clock is within 1 slot, this is deemed acceptable. Otherwise the node +/// will be marked as CandidateError::TimeDiscrepancy. +const FUTURE_SLOT_TOLERANCE: Slot = Slot::new(1); + +// Configuration for the Beacon Node fallback. +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +pub struct Config { + pub sync_tolerances: BeaconNodeSyncDistanceTiers, +} + /// Indicates a measurement of latency between the VC and a BN. pub struct LatencyMeasurement { /// An identifier for the beacon node (e.g. the URL). @@ -76,34 +91,8 @@ pub fn start_fallback_updater_service( Ok(()) } -/// Indicates if a beacon node must be synced before some action is performed on it. -#[derive(PartialEq, Clone, Copy)] -pub enum RequireSynced { - Yes, - No, -} - -/// Indicates if a beacon node should be set to `Offline` if a request fails. -#[derive(PartialEq, Clone, Copy)] -pub enum OfflineOnFailure { - Yes, - No, -} - -impl PartialEq for RequireSynced { - fn eq(&self, other: &bool) -> bool { - if *other { - *self == RequireSynced::Yes - } else { - *self == RequireSynced::No - } - } -} - #[derive(Debug)] pub enum Error { - /// The node was unavailable and we didn't attempt to contact it. - Unavailable(CandidateError), /// We attempted to contact the node but it failed. RequestFailed(T), } @@ -112,7 +101,6 @@ impl Error { pub fn request_failure(&self) -> Option<&T> { match self { Error::RequestFailed(e) => Some(e), - _ => None, } } } @@ -141,106 +129,159 @@ impl Errors { } /// Reasons why a candidate might not be ready. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)] pub enum CandidateError { + PreGenesis, Uninitialized, Offline, Incompatible, - NotSynced, + TimeDiscrepancy, +} + +impl std::fmt::Display for CandidateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CandidateError::PreGenesis => write!(f, "PreGenesis"), + CandidateError::Uninitialized => write!(f, "Uninitialized"), + CandidateError::Offline => write!(f, "Offline"), + CandidateError::Incompatible => write!(f, "Incompatible"), + CandidateError::TimeDiscrepancy => write!(f, "TimeDiscrepancy"), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct CandidateInfo { + pub index: usize, + pub endpoint: String, + pub health: Result, +} + +impl Serialize for CandidateInfo { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("CandidateInfo", 2)?; + + state.serialize_field("index", &self.index)?; + state.serialize_field("endpoint", &self.endpoint)?; + + // Serialize either the health or the error field based on the Result + match &self.health { + Ok(health) => { + state.serialize_field("health", health)?; + } + Err(e) => { + state.serialize_field("error", &e.to_string())?; + } + } + + state.end() + } } /// Represents a `BeaconNodeHttpClient` inside a `BeaconNodeFallback` that may or may not be used /// for a query. +#[derive(Clone, Debug)] pub struct CandidateBeaconNode { - beacon_node: BeaconNodeHttpClient, - status: RwLock>, + pub index: usize, + pub beacon_node: BeaconNodeHttpClient, + pub health: Arc>>, _phantom: PhantomData, } +impl PartialEq for CandidateBeaconNode { + fn eq(&self, other: &Self) -> bool { + self.index == other.index && self.beacon_node == other.beacon_node + } +} + +impl Eq for CandidateBeaconNode {} + impl CandidateBeaconNode { /// Instantiate a new node. - pub fn new(beacon_node: BeaconNodeHttpClient) -> Self { + pub fn new(beacon_node: BeaconNodeHttpClient, index: usize) -> Self { Self { + index, beacon_node, - status: RwLock::new(Err(CandidateError::Uninitialized)), + health: Arc::new(RwLock::new(Err(CandidateError::Uninitialized))), _phantom: PhantomData, } } - /// Returns the status of `self`. - /// - /// If `RequiredSynced::No`, any `NotSynced` node will be ignored and mapped to `Ok(())`. - pub async fn status(&self, synced: RequireSynced) -> Result<(), CandidateError> { - match *self.status.read().await { - Err(CandidateError::NotSynced) if synced == false => Ok(()), - other => other, - } - } - - /// Indicate that `self` is offline. - pub async fn set_offline(&self) { - *self.status.write().await = Err(CandidateError::Offline) + /// Returns the health of `self`. + pub async fn health(&self) -> Result { + *self.health.read().await } - /// Perform some queries against the node to determine if it is a good candidate, updating - /// `self.status` and returning that result. - pub async fn refresh_status( + pub async fn refresh_health( &self, + distance_tiers: &BeaconNodeSyncDistanceTiers, slot_clock: Option<&T>, spec: &ChainSpec, log: &Logger, ) -> Result<(), CandidateError> { - let previous_status = self.status(RequireSynced::Yes).await; - let was_offline = matches!(previous_status, Err(CandidateError::Offline)); - - let new_status = if let Err(e) = self.is_online(was_offline, log).await { - Err(e) - } else if let Err(e) = self.is_compatible(spec, log).await { - Err(e) - } else if let Err(e) = self.is_synced(slot_clock, log).await { - Err(e) - } else { - Ok(()) - }; - - // In case of concurrent use, the latest value will always be used. It's possible that a - // long time out might over-ride a recent successful response, leading to a falsely-offline - // status. I deem this edge-case acceptable in return for the concurrency benefits of not - // holding a write-lock whilst we check the online status of the node. - *self.status.write().await = new_status; - - new_status - } + if let Err(e) = self.is_compatible(spec, log).await { + *self.health.write().await = Err(e); + return Err(e); + } - /// Checks if the node is reachable. - async fn is_online(&self, was_offline: bool, log: &Logger) -> Result<(), CandidateError> { - let result = self - .beacon_node - .get_node_version() - .await - .map(|body| body.data.version); - - match result { - Ok(version) => { - if was_offline { - info!( - log, - "Connected to beacon node"; - "version" => version, - "endpoint" => %self.beacon_node, + if let Some(slot_clock) = slot_clock { + match check_node_health(&self.beacon_node, log).await { + Ok((head, is_optimistic, el_offline)) => { + let Some(slot_clock_head) = slot_clock.now() else { + let e = match slot_clock.is_prior_to_genesis() { + Some(true) => CandidateError::PreGenesis, + _ => CandidateError::Uninitialized, + }; + *self.health.write().await = Err(e); + return Err(e); + }; + + if head > slot_clock_head + FUTURE_SLOT_TOLERANCE { + let e = CandidateError::TimeDiscrepancy; + *self.health.write().await = Err(e); + return Err(e); + } + let sync_distance = slot_clock_head.saturating_sub(head); + + // Currently ExecutionEngineHealth is solely determined by online status. + let execution_status = if el_offline { + ExecutionEngineHealth::Unhealthy + } else { + ExecutionEngineHealth::Healthy + }; + + let optimistic_status = if is_optimistic { + IsOptimistic::Yes + } else { + IsOptimistic::No + }; + + let new_health = BeaconNodeHealth::from_status( + self.index, + sync_distance, + head, + optimistic_status, + execution_status, + distance_tiers, ); + + *self.health.write().await = Ok(new_health); + Ok(()) + } + Err(e) => { + // Set the health as `Err` which is sorted last in the list. + *self.health.write().await = Err(e); + Err(e) } - Ok(()) - } - Err(e) => { - warn!( - log, - "Offline beacon node"; - "error" => %e, - "endpoint" => %self.beacon_node, - ); - Err(CandidateError::Offline) } + } else { + // Slot clock will only be `None` at startup. + let e = CandidateError::Uninitialized; + *self.health.write().await = Err(e); + Err(e) } } @@ -248,7 +289,7 @@ impl CandidateBeaconNode { async fn is_compatible(&self, spec: &ChainSpec, log: &Logger) -> Result<(), CandidateError> { let config = self .beacon_node - .get_config_spec::() + .get_config_spec::() .await .map_err(|e| { error!( @@ -324,27 +365,15 @@ impl CandidateBeaconNode { Ok(()) } - - /// Checks if the beacon node is synced. - async fn is_synced( - &self, - slot_clock: Option<&T>, - log: &Logger, - ) -> Result<(), CandidateError> { - if let Some(slot_clock) = slot_clock { - check_synced(&self.beacon_node, slot_clock, Some(log)).await - } else { - // Skip this check if we don't supply a slot clock. - Ok(()) - } - } } /// A collection of `CandidateBeaconNode` that can be used to perform requests with "fallback" /// behaviour, where the failure of one candidate results in the next candidate receiving an /// identical query. +#[derive(Clone, Debug)] pub struct BeaconNodeFallback { - candidates: Vec>, + pub candidates: Arc>>>, + distance_tiers: BeaconNodeSyncDistanceTiers, slot_clock: Option, broadcast_topics: Vec, spec: Arc, @@ -354,12 +383,15 @@ pub struct BeaconNodeFallback { impl BeaconNodeFallback { pub fn new( candidates: Vec>, + config: Config, broadcast_topics: Vec, spec: Arc, log: Logger, ) -> Self { + let distance_tiers = config.sync_tolerances; Self { - candidates, + candidates: Arc::new(RwLock::new(candidates)), + distance_tiers, slot_clock: None, broadcast_topics, spec, @@ -377,41 +409,56 @@ impl BeaconNodeFallback { } /// The count of candidates, regardless of their state. - pub fn num_total(&self) -> usize { - self.candidates.len() + pub async fn num_total(&self) -> usize { + self.candidates.read().await.len() } - /// The count of synced and ready candidates. - pub async fn num_synced(&self) -> usize { + /// The count of candidates that are online and compatible, but not necessarily synced. + pub async fn num_available(&self) -> usize { let mut n = 0; - for candidate in &self.candidates { - if candidate.status(RequireSynced::Yes).await.is_ok() { - n += 1 + for candidate in self.candidates.read().await.iter() { + match candidate.health().await { + Ok(_) | Err(CandidateError::Uninitialized) => n += 1, + Err(_) => continue, } } n } - /// The count of synced and ready fallbacks excluding the primary beacon node candidate. - pub async fn num_synced_fallback(&self) -> usize { - let mut n = 0; - for candidate in self.candidates.iter().skip(1) { - if candidate.status(RequireSynced::Yes).await.is_ok() { - n += 1 + // Returns all data required by the VC notifier. + pub async fn get_notifier_info(&self) -> (Vec, usize, usize) { + let candidates = self.candidates.read().await; + + let mut candidate_info = Vec::with_capacity(candidates.len()); + let mut num_available = 0; + let mut num_synced = 0; + + for candidate in candidates.iter() { + let health = candidate.health().await; + + match health { + Ok(health) => { + if self + .distance_tiers + .compute_distance_tier(health.health_tier.sync_distance) + == SyncDistanceTier::Synced + { + num_synced += 1; + } + num_available += 1; + } + Err(CandidateError::Uninitialized) => num_available += 1, + Err(_) => (), } - } - n - } - /// The count of candidates that are online and compatible, but not necessarily synced. - pub async fn num_available(&self) -> usize { - let mut n = 0; - for candidate in &self.candidates { - if candidate.status(RequireSynced::No).await.is_ok() { - n += 1 - } + candidate_info.push(CandidateInfo { + index: candidate.index, + endpoint: candidate.beacon_node.to_string(), + health, + }); } - n + + (candidate_info, num_available, num_synced) } /// Loop through ALL candidates in `self.candidates` and update their sync status. @@ -420,26 +467,54 @@ impl BeaconNodeFallback { /// low quality responses. To route around this it's best to poll all connected beacon nodes. /// A previous implementation of this function polled only the unavailable BNs. pub async fn update_all_candidates(&self) { - let futures = self - .candidates - .iter() - .map(|candidate| { - candidate.refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) - }) - .collect::>(); + // Clone the vec, so we release the read lock immediately. + // `candidate.health` is behind an Arc, so this would still allow us to mutate the values. + let candidates = self.candidates.read().await.clone(); + let mut futures = Vec::with_capacity(candidates.len()); + let mut nodes = Vec::with_capacity(candidates.len()); + + for candidate in candidates.iter() { + futures.push(candidate.refresh_health( + &self.distance_tiers, + self.slot_clock.as_ref(), + &self.spec, + &self.log, + )); + nodes.push(candidate.beacon_node.to_string()); + } - // run all updates concurrently and ignore errors - let _ = future::join_all(futures).await; + // Run all updates concurrently. + let future_results = future::join_all(futures).await; + let results = future_results.iter().zip(nodes); + + for (result, node) in results { + if let Err(e) = result { + if *e != CandidateError::PreGenesis { + warn!( + self.log, + "A connected beacon node errored during routine health check"; + "error" => ?e, + "endpoint" => node, + ); + } + } + } + + drop(candidates); + + let mut candidates = self.candidates.write().await; + sort_nodes_by_health(&mut candidates).await; } /// Concurrently send a request to all candidates (regardless of /// offline/online) status and attempt to collect a rough reading on the /// latency between the VC and candidate. pub async fn measure_latency(&self) -> Vec { - let futures: Vec<_> = self - .candidates - .iter() - .map(|candidate| async { + let candidates = self.candidates.read().await; + let futures: Vec<_> = candidates + .clone() + .into_iter() + .map(|candidate| async move { let beacon_node_id = candidate.beacon_node.to_string(); // The `node/version` endpoint is used since I imagine it would // require the least processing in the BN and therefore measure @@ -456,6 +531,7 @@ impl BeaconNodeFallback { (beacon_node_id, response_instant) }) .collect(); + drop(candidates); let request_instant = Instant::now(); @@ -475,225 +551,120 @@ impl BeaconNodeFallback { /// Run `func` against each candidate in `self`, returning immediately if a result is found. /// Otherwise, return all the errors encountered along the way. - /// - /// First this function will try all nodes with a suitable status. If no candidates are suitable - /// or all the requests fail, it will try updating the status of all unsuitable nodes and - /// re-running `func` again. - pub async fn first_success<'a, F, O, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result> + pub async fn first_success(&self, func: F) -> Result> where - F: Fn(&'a BeaconNodeHttpClient) -> R, + F: Fn(BeaconNodeHttpClient) -> R, R: Future>, Err: Debug, { let mut errors = vec![]; - let mut to_retry = vec![]; - let mut retry_unsynced = vec![]; - let log = &self.log.clone(); + + // First pass: try `func` on all candidates. Candidate order has already been set in + // `update_all_candidates`. This ensures the most suitable node is always tried first. + let candidates = self.candidates.read().await; + let mut futures = vec![]; // Run `func` using a `candidate`, returning the value or capturing errors. - // - // We use a macro instead of a closure here since it is not trivial to move `func` into a - // closure. - macro_rules! try_func { - ($candidate: ident) => {{ - inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]); - - // There exists a race condition where `func` may be called when the candidate is - // actually not ready. We deem this an acceptable inefficiency. - match func(&$candidate.beacon_node).await { - Ok(val) => return Ok(val), - Err(e) => { - debug!( - log, - "Request to beacon node failed"; - "node" => $candidate.beacon_node.to_string(), - "error" => ?e, - ); - // If we have an error on this function, make the client as not-ready. - // - // There exists a race condition where the candidate may have been marked - // as ready between the `func` call and now. We deem this an acceptable - // inefficiency. - if matches!(offline_on_failure, OfflineOnFailure::Yes) { - $candidate.set_offline().await; - } - errors.push(($candidate.beacon_node.to_string(), Error::RequestFailed(e))); - inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]); - } - } - }}; + for candidate in candidates.iter() { + futures.push(Self::run_on_candidate( + candidate.beacon_node.clone(), + &func, + &self.log, + )); } + drop(candidates); - // First pass: try `func` on all synced and ready candidates. - // - // This ensures that we always choose a synced node if it is available. - for candidate in &self.candidates { - match candidate.status(RequireSynced::Yes).await { - Err(e @ CandidateError::NotSynced) if require_synced == false => { - // This client is unsynced we will try it after trying all synced clients - retry_unsynced.push(candidate); - errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); - } - Err(e) => { - // This client was not ready on the first pass, we might try it again later. - to_retry.push(candidate); - errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); - } - _ => try_func!(candidate), + for future in futures { + match future.await { + Ok(val) => return Ok(val), + Err(e) => errors.push(e), } } - // Second pass: try `func` on ready unsynced candidates. This only runs if we permit - // unsynced candidates. - // - // Due to async race-conditions, it is possible that we will send a request to a candidate - // that has been set to an offline/unready status. This is acceptable. - if require_synced == false { - for candidate in retry_unsynced { - try_func!(candidate); - } - } + // Second pass. No candidates returned successfully. Try again with the same order. + // This will duplicate errors. + let candidates = self.candidates.read().await; + let mut futures = vec![]; - // Third pass: try again, attempting to make non-ready clients become ready. - for candidate in to_retry { - // If the candidate hasn't luckily transferred into the correct state in the meantime, - // force an update of the state. - let new_status = match candidate.status(require_synced).await { - Ok(()) => Ok(()), - Err(_) => { - candidate - .refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) - .await - } - }; + // Run `func` using a `candidate`, returning the value or capturing errors. + for candidate in candidates.iter() { + futures.push(Self::run_on_candidate( + candidate.beacon_node.clone(), + &func, + &self.log, + )); + } + drop(candidates); - match new_status { - Ok(()) => try_func!(candidate), - Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate), - Err(e) => { - errors.push((candidate.beacon_node.to_string(), Error::Unavailable(e))); - } + for future in futures { + match future.await { + Ok(val) => return Ok(val), + Err(e) => errors.push(e), } } - // There were no candidates already ready and we were unable to make any of them ready. + // No candidates returned successfully. Err(Errors(errors)) } + /// Run the future `func` on `candidate` while reporting metrics. + async fn run_on_candidate( + candidate: BeaconNodeHttpClient, + func: F, + log: &Logger, + ) -> Result)> + where + F: Fn(BeaconNodeHttpClient) -> R, + R: Future>, + Err: Debug, + { + inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.as_ref()]); + + // There exists a race condition where `func` may be called when the candidate is + // actually not ready. We deem this an acceptable inefficiency. + match func(candidate.clone()).await { + Ok(val) => Ok(val), + Err(e) => { + debug!( + log, + "Request to beacon node failed"; + "node" => %candidate, + "error" => ?e, + ); + inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.as_ref()]); + Err((candidate.to_string(), Error::RequestFailed(e))) + } + } + } + /// Run `func` against all candidates in `self`, collecting the result of `func` against each /// candidate. /// - /// First this function will try all nodes with a suitable status. If no candidates are suitable - /// it will try updating the status of all unsuitable nodes and re-running `func` again. - /// /// Note: This function returns `Ok(())` if `func` returned successfully on all beacon nodes. /// It returns a list of errors along with the beacon node id that failed for `func`. /// Since this ignores the actual result of `func`, this function should only be used for beacon /// node calls whose results we do not care about, only that they completed successfully. - pub async fn broadcast<'a, F, O, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result<(), Errors> + pub async fn broadcast(&self, func: F) -> Result<(), Errors> where - F: Fn(&'a BeaconNodeHttpClient) -> R, + F: Fn(BeaconNodeHttpClient) -> R, R: Future>, + Err: Debug, { - let mut to_retry = vec![]; - let mut retry_unsynced = vec![]; + // Run `func` on all candidates. + let candidates = self.candidates.read().await; + let mut futures = vec![]; // Run `func` using a `candidate`, returning the value or capturing errors. - let run_on_candidate = |candidate: &'a CandidateBeaconNode| async { - inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]); - - // There exists a race condition where `func` may be called when the candidate is - // actually not ready. We deem this an acceptable inefficiency. - match func(&candidate.beacon_node).await { - Ok(val) => Ok(val), - Err(e) => { - // If we have an error on this function, mark the client as not-ready. - // - // There exists a race condition where the candidate may have been marked - // as ready between the `func` call and now. We deem this an acceptable - // inefficiency. - if matches!(offline_on_failure, OfflineOnFailure::Yes) { - candidate.set_offline().await; - } - inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]); - Err((candidate.beacon_node.to_string(), Error::RequestFailed(e))) - } - } - }; - - // First pass: try `func` on all synced and ready candidates. - // - // This ensures that we always choose a synced node if it is available. - let mut first_batch_futures = vec![]; - for candidate in &self.candidates { - match candidate.status(RequireSynced::Yes).await { - Ok(_) => { - first_batch_futures.push(run_on_candidate(candidate)); - } - Err(CandidateError::NotSynced) if require_synced == false => { - // This client is unsynced we will try it after trying all synced clients - retry_unsynced.push(candidate); - } - Err(_) => { - // This client was not ready on the first pass, we might try it again later. - to_retry.push(candidate); - } - } - } - let first_batch_results = futures::future::join_all(first_batch_futures).await; - - // Second pass: try `func` on ready unsynced candidates. This only runs if we permit - // unsynced candidates. - // - // Due to async race-conditions, it is possible that we will send a request to a candidate - // that has been set to an offline/unready status. This is acceptable. - let second_batch_results = if require_synced == false { - futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await - } else { - vec![] - }; - - // Third pass: try again, attempting to make non-ready clients become ready. - let mut third_batch_futures = vec![]; - let mut third_batch_results = vec![]; - for candidate in to_retry { - // If the candidate hasn't luckily transferred into the correct state in the meantime, - // force an update of the state. - let new_status = match candidate.status(require_synced).await { - Ok(()) => Ok(()), - Err(_) => { - candidate - .refresh_status(self.slot_clock.as_ref(), &self.spec, &self.log) - .await - } - }; - - match new_status { - Ok(()) => third_batch_futures.push(run_on_candidate(candidate)), - Err(CandidateError::NotSynced) if require_synced == false => { - third_batch_futures.push(run_on_candidate(candidate)) - } - Err(e) => third_batch_results.push(Err(( - candidate.beacon_node.to_string(), - Error::Unavailable(e), - ))), - } + for candidate in candidates.iter() { + futures.push(Self::run_on_candidate( + candidate.beacon_node.clone(), + &func, + &self.log, + )); } - third_batch_results.extend(futures::future::join_all(third_batch_futures).await); + drop(candidates); - let mut results = first_batch_results; - results.extend(second_batch_results); - results.extend(third_batch_results); + let results = future::join_all(futures).await; let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect(); @@ -706,29 +677,47 @@ impl BeaconNodeFallback { /// Call `func` on first beacon node that returns success or on all beacon nodes /// depending on the `topic` and configuration. - pub async fn request<'a, F, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, - topic: ApiTopic, - func: F, - ) -> Result<(), Errors> + pub async fn request(&self, topic: ApiTopic, func: F) -> Result<(), Errors> where - F: Fn(&'a BeaconNodeHttpClient) -> R, + F: Fn(BeaconNodeHttpClient) -> R, R: Future>, Err: Debug, { if self.broadcast_topics.contains(&topic) { - self.broadcast(require_synced, offline_on_failure, func) - .await + self.broadcast(func).await } else { - self.first_success(require_synced, offline_on_failure, func) - .await?; + self.first_success(func).await?; Ok(()) } } } +/// Helper functions to allow sorting candidate nodes by health. +async fn sort_nodes_by_health(nodes: &mut Vec>) { + // Fetch all health values. + let health_results: Vec> = + future::join_all(nodes.iter().map(|node| node.health())).await; + + // Pair health results with their indices. + let mut indices_with_health: Vec<(usize, Result)> = + health_results.into_iter().enumerate().collect(); + + // Sort indices based on their health. + indices_with_health.sort_by(|a, b| match (&a.1, &b.1) { + (Ok(health_a), Ok(health_b)) => health_a.cmp(health_b), + (Err(_), Ok(_)) => Ordering::Greater, + (Ok(_), Err(_)) => Ordering::Less, + (Err(_), Err(_)) => Ordering::Equal, + }); + + // Reorder candidates based on the sorted indices. + let sorted_nodes: Vec> = indices_with_health + .into_iter() + .map(|(index, _)| nodes[index].clone()) + .collect(); + *nodes = sorted_nodes; +} + /// Serves as a cue for `BeaconNodeFallback` to tell which requests need to be broadcasted. #[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize, EnumString, EnumVariantNames)] #[strum(serialize_all = "kebab-case")] @@ -747,10 +736,16 @@ impl ApiTopic { } #[cfg(test)] -mod test { +mod tests { use super::*; + use crate::beacon_node_health::BeaconNodeHealthTier; + use crate::SensitiveUrl; + use eth2::Timeouts; use std::str::FromStr; use strum::VariantNames; + use types::{MainnetEthSpec, Slot}; + + type E = MainnetEthSpec; #[test] fn api_topic_all() { @@ -761,4 +756,115 @@ mod test { .map(|topic| ApiTopic::from_str(topic).unwrap()) .eq(all.into_iter())); } + + #[tokio::test] + async fn check_candidate_order() { + // These fields is irrelvant for sorting. They are set to arbitrary values. + let head = Slot::new(99); + let optimistic_status = IsOptimistic::No; + let execution_status = ExecutionEngineHealth::Healthy; + + fn new_candidate(index: usize) -> CandidateBeaconNode { + let beacon_node = BeaconNodeHttpClient::new( + SensitiveUrl::parse(&format!("http://example_{index}.com")).unwrap(), + Timeouts::set_all(Duration::from_secs(index as u64)), + ); + CandidateBeaconNode::new(beacon_node, index) + } + + let candidate_1 = new_candidate(1); + let expected_candidate_1 = new_candidate(1); + let candidate_2 = new_candidate(2); + let expected_candidate_2 = new_candidate(2); + let candidate_3 = new_candidate(3); + let expected_candidate_3 = new_candidate(3); + let candidate_4 = new_candidate(4); + let expected_candidate_4 = new_candidate(4); + let candidate_5 = new_candidate(5); + let expected_candidate_5 = new_candidate(5); + let candidate_6 = new_candidate(6); + let expected_candidate_6 = new_candidate(6); + + let synced = SyncDistanceTier::Synced; + let small = SyncDistanceTier::Small; + + // Despite `health_1` having a larger sync distance, it is inside the `synced` range which + // does not tie-break on sync distance and so will tie-break on `user_index` instead. + let health_1 = BeaconNodeHealth { + user_index: 1, + head, + optimistic_status, + execution_status, + health_tier: BeaconNodeHealthTier::new(1, Slot::new(2), synced), + }; + let health_2 = BeaconNodeHealth { + user_index: 2, + head, + optimistic_status, + execution_status, + health_tier: BeaconNodeHealthTier::new(2, Slot::new(1), synced), + }; + + // `health_3` and `health_4` have the same health tier and sync distance so should + // tie-break on `user_index`. + let health_3 = BeaconNodeHealth { + user_index: 3, + head, + optimistic_status, + execution_status, + health_tier: BeaconNodeHealthTier::new(3, Slot::new(9), small), + }; + let health_4 = BeaconNodeHealth { + user_index: 4, + head, + optimistic_status, + execution_status, + health_tier: BeaconNodeHealthTier::new(3, Slot::new(9), small), + }; + + // `health_5` has a smaller sync distance and is outside the `synced` range so should be + // sorted first. Note the values of `user_index`. + let health_5 = BeaconNodeHealth { + user_index: 6, + head, + optimistic_status, + execution_status, + health_tier: BeaconNodeHealthTier::new(4, Slot::new(9), small), + }; + let health_6 = BeaconNodeHealth { + user_index: 5, + head, + optimistic_status, + execution_status, + health_tier: BeaconNodeHealthTier::new(4, Slot::new(10), small), + }; + + *candidate_1.health.write().await = Ok(health_1); + *candidate_2.health.write().await = Ok(health_2); + *candidate_3.health.write().await = Ok(health_3); + *candidate_4.health.write().await = Ok(health_4); + *candidate_5.health.write().await = Ok(health_5); + *candidate_6.health.write().await = Ok(health_6); + + let mut candidates = vec![ + candidate_3, + candidate_6, + candidate_5, + candidate_1, + candidate_4, + candidate_2, + ]; + let expected_candidates = vec![ + expected_candidate_1, + expected_candidate_2, + expected_candidate_3, + expected_candidate_4, + expected_candidate_5, + expected_candidate_6, + ]; + + sort_nodes_by_health(&mut candidates).await; + + assert_eq!(candidates, expected_candidates); + } } diff --git a/validator_client/src/beacon_node_health.rs b/validator_client/src/beacon_node_health.rs new file mode 100644 index 00000000000..1783bb312cf --- /dev/null +++ b/validator_client/src/beacon_node_health.rs @@ -0,0 +1,420 @@ +use itertools::Itertools; +use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; +use std::fmt::{Debug, Display, Formatter}; +use std::str::FromStr; +use types::Slot; + +/// Sync distances between 0 and DEFAULT_SYNC_TOLERANCE are considered `synced`. +/// Sync distance tiers are determined by the different modifiers. +/// +/// The default range is the following: +/// Synced: 0..=8 +/// Small: 9..=16 +/// Medium: 17..=64 +/// Large: 65.. +const DEFAULT_SYNC_TOLERANCE: Slot = Slot::new(8); +const DEFAULT_SMALL_SYNC_DISTANCE_MODIFIER: Slot = Slot::new(8); +const DEFAULT_MEDIUM_SYNC_DISTANCE_MODIFIER: Slot = Slot::new(48); + +type HealthTier = u8; +type SyncDistance = Slot; + +/// Helpful enum which is used when pattern matching to determine health tier. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum SyncDistanceTier { + Synced, + Small, + Medium, + Large, +} + +/// Contains the different sync distance tiers which are determined at runtime by the +/// `beacon-nodes-sync-tolerances` flag. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BeaconNodeSyncDistanceTiers { + pub synced: SyncDistance, + pub small: SyncDistance, + pub medium: SyncDistance, +} + +impl Default for BeaconNodeSyncDistanceTiers { + fn default() -> Self { + Self { + synced: DEFAULT_SYNC_TOLERANCE, + small: DEFAULT_SYNC_TOLERANCE + DEFAULT_SMALL_SYNC_DISTANCE_MODIFIER, + medium: DEFAULT_SYNC_TOLERANCE + + DEFAULT_SMALL_SYNC_DISTANCE_MODIFIER + + DEFAULT_MEDIUM_SYNC_DISTANCE_MODIFIER, + } + } +} + +impl FromStr for BeaconNodeSyncDistanceTiers { + type Err = String; + + fn from_str(s: &str) -> Result { + let values: (u64, u64, u64) = s + .split(',') + .map(|s| { + s.parse() + .map_err(|e| format!("Invalid sync distance modifier: {e:?}")) + }) + .collect::, _>>()? + .into_iter() + .collect_tuple() + .ok_or("Invalid number of sync distance modifiers".to_string())?; + + Ok(BeaconNodeSyncDistanceTiers { + synced: Slot::new(values.0), + small: Slot::new(values.0 + values.1), + medium: Slot::new(values.0 + values.1 + values.2), + }) + } +} + +impl BeaconNodeSyncDistanceTiers { + /// Takes a given sync distance and determines its tier based on the `sync_tolerance` defined by + /// the CLI. + pub fn compute_distance_tier(&self, distance: SyncDistance) -> SyncDistanceTier { + if distance <= self.synced { + SyncDistanceTier::Synced + } else if distance <= self.small { + SyncDistanceTier::Small + } else if distance <= self.medium { + SyncDistanceTier::Medium + } else { + SyncDistanceTier::Large + } + } +} + +/// Execution Node health metrics. +/// +/// Currently only considers `el_offline`. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum ExecutionEngineHealth { + Healthy, + Unhealthy, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum IsOptimistic { + Yes, + No, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BeaconNodeHealthTier { + pub tier: HealthTier, + pub sync_distance: SyncDistance, + pub distance_tier: SyncDistanceTier, +} + +impl Display for BeaconNodeHealthTier { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Tier{}({})", self.tier, self.sync_distance) + } +} + +impl Ord for BeaconNodeHealthTier { + fn cmp(&self, other: &Self) -> Ordering { + let ordering = self.tier.cmp(&other.tier); + if ordering == Ordering::Equal { + if self.distance_tier == SyncDistanceTier::Synced { + // Don't tie-break on sync distance in these cases. + // This ensures validator clients don't artificially prefer one node. + ordering + } else { + self.sync_distance.cmp(&other.sync_distance) + } + } else { + ordering + } + } +} + +impl PartialOrd for BeaconNodeHealthTier { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl BeaconNodeHealthTier { + pub fn new( + tier: HealthTier, + sync_distance: SyncDistance, + distance_tier: SyncDistanceTier, + ) -> Self { + Self { + tier, + sync_distance, + distance_tier, + } + } +} + +/// Beacon Node Health metrics. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct BeaconNodeHealth { + // The index of the Beacon Node. This should correspond with its position in the + // `--beacon-nodes` list. Note that the `user_index` field is used to tie-break nodes with the + // same health so that nodes with a lower index are preferred. + pub user_index: usize, + // The slot number of the head. + pub head: Slot, + // Whether the node is optimistically synced. + pub optimistic_status: IsOptimistic, + // The status of the nodes connected Execution Engine. + pub execution_status: ExecutionEngineHealth, + // The overall health tier of the Beacon Node. Used to rank the nodes for the purposes of + // fallbacks. + pub health_tier: BeaconNodeHealthTier, +} + +impl Ord for BeaconNodeHealth { + fn cmp(&self, other: &Self) -> Ordering { + let ordering = self.health_tier.cmp(&other.health_tier); + if ordering == Ordering::Equal { + // Tie-break node health by `user_index`. + self.user_index.cmp(&other.user_index) + } else { + ordering + } + } +} + +impl PartialOrd for BeaconNodeHealth { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl BeaconNodeHealth { + pub fn from_status( + user_index: usize, + sync_distance: Slot, + head: Slot, + optimistic_status: IsOptimistic, + execution_status: ExecutionEngineHealth, + distance_tiers: &BeaconNodeSyncDistanceTiers, + ) -> Self { + let health_tier = BeaconNodeHealth::compute_health_tier( + sync_distance, + optimistic_status, + execution_status, + distance_tiers, + ); + + Self { + user_index, + head, + optimistic_status, + execution_status, + health_tier, + } + } + + pub fn get_index(&self) -> usize { + self.user_index + } + + pub fn get_health_tier(&self) -> BeaconNodeHealthTier { + self.health_tier + } + + fn compute_health_tier( + sync_distance: SyncDistance, + optimistic_status: IsOptimistic, + execution_status: ExecutionEngineHealth, + sync_distance_tiers: &BeaconNodeSyncDistanceTiers, + ) -> BeaconNodeHealthTier { + let sync_distance_tier = sync_distance_tiers.compute_distance_tier(sync_distance); + let health = (sync_distance_tier, optimistic_status, execution_status); + + match health { + (SyncDistanceTier::Synced, IsOptimistic::No, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(1, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Small, IsOptimistic::No, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(2, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Synced, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(3, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Medium, IsOptimistic::No, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(4, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Synced, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(5, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Synced, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(6, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Small, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(7, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Small, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(8, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Small, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(9, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Large, IsOptimistic::No, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(10, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Medium, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(11, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Medium, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(12, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Medium, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(13, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Large, IsOptimistic::No, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(14, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Large, IsOptimistic::Yes, ExecutionEngineHealth::Healthy) => { + BeaconNodeHealthTier::new(15, sync_distance, sync_distance_tier) + } + (SyncDistanceTier::Large, IsOptimistic::Yes, ExecutionEngineHealth::Unhealthy) => { + BeaconNodeHealthTier::new(16, sync_distance, sync_distance_tier) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::ExecutionEngineHealth::{Healthy, Unhealthy}; + use super::{ + BeaconNodeHealth, BeaconNodeHealthTier, BeaconNodeSyncDistanceTiers, IsOptimistic, + SyncDistanceTier, + }; + use crate::beacon_node_fallback::Config; + use std::str::FromStr; + use types::Slot; + + #[test] + fn all_possible_health_tiers() { + let config = Config::default(); + let beacon_node_sync_distance_tiers = config.sync_tolerances; + + let mut health_vec = vec![]; + + for head_slot in 0..=64 { + for optimistic_status in &[IsOptimistic::No, IsOptimistic::Yes] { + for ee_health in &[Healthy, Unhealthy] { + let health = BeaconNodeHealth::from_status( + 0, + Slot::new(0), + Slot::new(head_slot), + *optimistic_status, + *ee_health, + &beacon_node_sync_distance_tiers, + ); + health_vec.push(health); + } + } + } + + for health in health_vec { + let health_tier = health.get_health_tier(); + let tier = health_tier.tier; + let distance = health_tier.sync_distance; + + let distance_tier = beacon_node_sync_distance_tiers.compute_distance_tier(distance); + + // Check sync distance. + if [1, 3, 5, 6].contains(&tier) { + assert!(distance_tier == SyncDistanceTier::Synced) + } else if [2, 7, 8, 9].contains(&tier) { + assert!(distance_tier == SyncDistanceTier::Small); + } else if [4, 11, 12, 13].contains(&tier) { + assert!(distance_tier == SyncDistanceTier::Medium); + } else { + assert!(distance_tier == SyncDistanceTier::Large); + } + + // Check optimistic status. + if [1, 2, 3, 4, 7, 10, 11, 14].contains(&tier) { + assert_eq!(health.optimistic_status, IsOptimistic::No); + } else { + assert_eq!(health.optimistic_status, IsOptimistic::Yes); + } + + // Check execution health. + if [3, 6, 7, 9, 11, 13, 14, 16].contains(&tier) { + assert_eq!(health.execution_status, Unhealthy); + } else { + assert_eq!(health.execution_status, Healthy); + } + } + } + + fn new_distance_tier( + distance: u64, + distance_tiers: &BeaconNodeSyncDistanceTiers, + ) -> BeaconNodeHealthTier { + BeaconNodeHealth::compute_health_tier( + Slot::new(distance), + IsOptimistic::No, + Healthy, + distance_tiers, + ) + } + + #[test] + fn sync_tolerance_default() { + let distance_tiers = BeaconNodeSyncDistanceTiers::default(); + + let synced_low = new_distance_tier(0, &distance_tiers); + let synced_high = new_distance_tier(8, &distance_tiers); + + let small_low = new_distance_tier(9, &distance_tiers); + let small_high = new_distance_tier(16, &distance_tiers); + + let medium_low = new_distance_tier(17, &distance_tiers); + let medium_high = new_distance_tier(64, &distance_tiers); + let large = new_distance_tier(65, &distance_tiers); + + assert_eq!(synced_low.tier, 1); + assert_eq!(synced_high.tier, 1); + assert_eq!(small_low.tier, 2); + assert_eq!(small_high.tier, 2); + assert_eq!(medium_low.tier, 4); + assert_eq!(medium_high.tier, 4); + assert_eq!(large.tier, 10); + } + + #[test] + fn sync_tolerance_from_str() { + // String should set the tiers as: + // synced: 0..=4 + // small: 5..=8 + // medium 9..=12 + // large: 13.. + + let distance_tiers = BeaconNodeSyncDistanceTiers::from_str("4,4,4").unwrap(); + + let synced_low = new_distance_tier(0, &distance_tiers); + let synced_high = new_distance_tier(4, &distance_tiers); + + let small_low = new_distance_tier(5, &distance_tiers); + let small_high = new_distance_tier(8, &distance_tiers); + + let medium_low = new_distance_tier(9, &distance_tiers); + let medium_high = new_distance_tier(12, &distance_tiers); + + let large = new_distance_tier(13, &distance_tiers); + + assert_eq!(synced_low.tier, 1); + assert_eq!(synced_high.tier, 1); + assert_eq!(small_low.tier, 2); + assert_eq!(small_high.tier, 2); + assert_eq!(medium_low.tier, 4); + assert_eq!(medium_high.tier, 4); + assert_eq!(large.tier, 10); + } +} diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index af11d82eb53..665eaf0a0f7 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -1,9 +1,8 @@ use crate::beacon_node_fallback::{Error as FallbackError, Errors}; use crate::{ - beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}, + beacon_node_fallback::{ApiTopic, BeaconNodeFallback}, determine_graffiti, graffiti_file::GraffitiFile, - OfflineOnFailure, }; use crate::{ http_metrics::metrics, @@ -141,26 +140,16 @@ pub struct ProposerFallback { impl ProposerFallback { // Try `func` on `self.proposer_nodes` first. If that doesn't work, try `self.beacon_nodes`. - pub async fn request_proposers_first<'a, F, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result<(), Errors> + pub async fn request_proposers_first(&self, func: F) -> Result<(), Errors> where - F: Fn(&'a BeaconNodeHttpClient) -> R + Clone, + F: Fn(BeaconNodeHttpClient) -> R + Clone, R: Future>, Err: Debug, { // If there are proposer nodes, try calling `func` on them and return early if they are successful. if let Some(proposer_nodes) = &self.proposer_nodes { if proposer_nodes - .request( - require_synced, - offline_on_failure, - ApiTopic::Blocks, - func.clone(), - ) + .request(ApiTopic::Blocks, func.clone()) .await .is_ok() { @@ -169,28 +158,18 @@ impl ProposerFallback { } // If the proposer nodes failed, try on the non-proposer nodes. - self.beacon_nodes - .request(require_synced, offline_on_failure, ApiTopic::Blocks, func) - .await + self.beacon_nodes.request(ApiTopic::Blocks, func).await } // Try `func` on `self.beacon_nodes` first. If that doesn't work, try `self.proposer_nodes`. - pub async fn request_proposers_last<'a, F, O, Err, R>( - &'a self, - require_synced: RequireSynced, - offline_on_failure: OfflineOnFailure, - func: F, - ) -> Result> + pub async fn request_proposers_last(&self, func: F) -> Result> where - F: Fn(&'a BeaconNodeHttpClient) -> R + Clone, + F: Fn(BeaconNodeHttpClient) -> R + Clone, R: Future>, Err: Debug, { // Try running `func` on the non-proposer beacon nodes. - let beacon_nodes_result = self - .beacon_nodes - .first_success(require_synced, offline_on_failure, func.clone()) - .await; + let beacon_nodes_result = self.beacon_nodes.first_success(func.clone()).await; match (beacon_nodes_result, &self.proposer_nodes) { // The non-proposer node call succeed, return the result. @@ -198,11 +177,7 @@ impl ProposerFallback { // The non-proposer node call failed, but we don't have any proposer nodes. Return an error. (Err(e), None) => Err(e), // The non-proposer node call failed, try the same call on the proposer nodes. - (Err(_), Some(proposer_nodes)) => { - proposer_nodes - .first_success(require_synced, offline_on_failure, func) - .await - } + (Err(_), Some(proposer_nodes)) => proposer_nodes.first_success(func).await, } } } @@ -211,8 +186,8 @@ impl ProposerFallback { pub struct Inner { validator_store: Arc>, slot_clock: Arc, - beacon_nodes: Arc>, - proposer_nodes: Option>>, + pub(crate) beacon_nodes: Arc>, + pub(crate) proposer_nodes: Option>>, context: RuntimeContext, graffiti: Option, graffiti_file: Option, @@ -418,14 +393,10 @@ impl BlockService { // protect them from DoS attacks and they're most likely to successfully // publish a block. proposer_fallback - .request_proposers_first( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async { - self.publish_signed_block_contents(&signed_block, beacon_node) - .await - }, - ) + .request_proposers_first(|beacon_node| async { + self.publish_signed_block_contents(&signed_block, beacon_node) + .await + }) .await?; info!( @@ -503,32 +474,28 @@ impl BlockService { // Try the proposer nodes last, since it's likely that they don't have a // great view of attestations on the network. let unsigned_block = proposer_fallback - .request_proposers_last( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _get_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_GET], - ); - Self::get_validator_block( - beacon_node, - slot, - randao_reveal_ref, - graffiti, - proposer_index, - builder_boost_factor, - log, - ) - .await - .map_err(|e| { - BlockError::Recoverable(format!( - "Error from beacon node when producing block: {:?}", - e - )) - }) - }, - ) + .request_proposers_last(|beacon_node| async move { + let _get_timer = metrics::start_timer_vec( + &metrics::BLOCK_SERVICE_TIMES, + &[metrics::BEACON_BLOCK_HTTP_GET], + ); + Self::get_validator_block( + &beacon_node, + slot, + randao_reveal_ref, + graffiti, + proposer_index, + builder_boost_factor, + log, + ) + .await + .map_err(|e| { + BlockError::Recoverable(format!( + "Error from beacon node when producing block: {:?}", + e + )) + }) + }) .await?; self_ref @@ -547,7 +514,7 @@ impl BlockService { async fn publish_signed_block_contents( &self, signed_block: &SignedBlock, - beacon_node: &BeaconNodeHttpClient, + beacon_node: BeaconNodeHttpClient, ) -> Result<(), BlockError> { let log = self.context.log(); let slot = signed_block.slot(); diff --git a/validator_client/src/check_synced.rs b/validator_client/src/check_synced.rs index 6437682512d..2e9a62ff65a 100644 --- a/validator_client/src/check_synced.rs +++ b/validator_client/src/check_synced.rs @@ -1,80 +1,27 @@ use crate::beacon_node_fallback::CandidateError; -use eth2::BeaconNodeHttpClient; -use slog::{debug, error, warn, Logger}; -use slot_clock::SlotClock; +use eth2::{types::Slot, BeaconNodeHttpClient}; +use slog::{warn, Logger}; -/// A distance in slots. -const SYNC_TOLERANCE: u64 = 4; - -/// Returns -/// -/// `Ok(())` if the beacon node is synced and ready for action, -/// `Err(CandidateError::Offline)` if the beacon node is unreachable, -/// `Err(CandidateError::NotSynced)` if the beacon node indicates that it is syncing **AND** -/// it is more than `SYNC_TOLERANCE` behind the highest -/// known slot. -/// -/// The second condition means the even if the beacon node thinks that it's syncing, we'll still -/// try to use it if it's close enough to the head. -pub async fn check_synced( +pub async fn check_node_health( beacon_node: &BeaconNodeHttpClient, - slot_clock: &T, - log_opt: Option<&Logger>, -) -> Result<(), CandidateError> { + log: &Logger, +) -> Result<(Slot, bool, bool), CandidateError> { let resp = match beacon_node.get_node_syncing().await { Ok(resp) => resp, Err(e) => { - if let Some(log) = log_opt { - warn!( - log, - "Unable connect to beacon node"; - "error" => %e - ) - } - - return Err(CandidateError::Offline); - } - }; - - let bn_is_synced = !resp.data.is_syncing || (resp.data.sync_distance.as_u64() < SYNC_TOLERANCE); - let is_synced = bn_is_synced && !resp.data.el_offline; - - if let Some(log) = log_opt { - if !is_synced { - debug!( - log, - "Beacon node sync status"; - "status" => format!("{:?}", resp), - ); - warn!( log, - "Beacon node is not synced"; - "sync_distance" => resp.data.sync_distance.as_u64(), - "head_slot" => resp.data.head_slot.as_u64(), - "endpoint" => %beacon_node, - "el_offline" => resp.data.el_offline, + "Unable connect to beacon node"; + "error" => %e ); - } - if let Some(local_slot) = slot_clock.now() { - let remote_slot = resp.data.head_slot + resp.data.sync_distance; - if remote_slot + 1 < local_slot || local_slot + 1 < remote_slot { - error!( - log, - "Time discrepancy with beacon node"; - "msg" => "check the system time on this host and the beacon node", - "beacon_node_slot" => remote_slot, - "local_slot" => local_slot, - "endpoint" => %beacon_node, - ); - } + return Err(CandidateError::Offline); } - } + }; - if is_synced { - Ok(()) - } else { - Err(CandidateError::NotSynced) - } + Ok(( + resp.data.head_slot, + resp.data.is_optimistic, + resp.data.el_offline, + )) } diff --git a/validator_client/src/cli.rs b/validator_client/src/cli.rs index f84260a9243..b027ad0df6d 100644 --- a/validator_client/src/cli.rs +++ b/validator_client/src/cli.rs @@ -444,6 +444,33 @@ pub fn cli_app() -> Command { .help_heading(FLAG_HEADER) .display_order(0) ) + .arg( + Arg::new("beacon-nodes-sync-tolerances") + .long("beacon-nodes-sync-tolerances") + .value_name("SYNC_TOLERANCES") + .help("A comma-separated list of 3 values which sets the size of each sync distance range when \ + determining the health of each connected beacon node. \ + The first value determines the `Synced` range. \ + If a connected beacon node is synced to within this number of slots it is considered 'Synced'. \ + The second value determines the `Small` sync distance range. \ + This range starts immediately after the `Synced` range. \ + The third value determines the `Medium` sync distance range. \ + This range starts immediately after the `Small` range. \ + Any sync distance value beyond that is considered `Large`. \ + For example, a value of `8,8,48` would have ranges like the following: \ + `Synced`: 0..=8 \ + `Small`: 9..=16 \ + `Medium`: 17..=64 \ + `Large`: 65.. \ + These values are used to determine what ordering beacon node fallbacks are used in. \ + Generally, `Synced` nodes are preferred over `Small` and so on. \ + Nodes in the `Synced` range will tie-break based on their ordering in `--beacon-nodes`. \ + This ensures the primary beacon node is prioritised. \ + [default: 8,8,48]") + .action(ArgAction::Set) + .help_heading(FLAG_HEADER) + .display_order(0) + ) .arg( Arg::new("disable-slashing-protection-web3signer") .long("disable-slashing-protection-web3signer") diff --git a/validator_client/src/config.rs b/validator_client/src/config.rs index 204c5b8b6cc..c2c445c48c3 100644 --- a/validator_client/src/config.rs +++ b/validator_client/src/config.rs @@ -1,6 +1,8 @@ use crate::beacon_node_fallback::ApiTopic; use crate::graffiti_file::GraffitiFile; -use crate::{http_api, http_metrics}; +use crate::{ + beacon_node_fallback, beacon_node_health::BeaconNodeSyncDistanceTiers, http_api, http_metrics, +}; use clap::ArgMatches; use clap_utils::{flags::DISABLE_MALLOC_TUNING_FLAG, parse_optional, parse_required}; use directory::{ @@ -14,6 +16,7 @@ use slog::{info, warn, Logger}; use std::fs; use std::net::IpAddr; use std::path::PathBuf; +use std::str::FromStr; use std::time::Duration; use types::{Address, GRAFFITI_BYTES_LEN}; @@ -21,7 +24,7 @@ pub const DEFAULT_BEACON_NODE: &str = "http://localhost:5052/"; pub const DEFAULT_WEB3SIGNER_KEEP_ALIVE: Option = Some(Duration::from_secs(20)); /// Stores the core configuration for this validator instance. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Config { /// The data directory, which stores all validator databases pub validator_dir: PathBuf, @@ -52,6 +55,8 @@ pub struct Config { pub http_api: http_api::Config, /// Configuration for the HTTP REST API. pub http_metrics: http_metrics::Config, + /// Configuration for the Beacon Node fallback. + pub beacon_node_fallback: beacon_node_fallback::Config, /// Configuration for sending metrics to a remote explorer endpoint. pub monitoring_api: Option, /// If true, enable functionality that monitors the network for attestations or proposals from @@ -117,6 +122,7 @@ impl Default for Config { fee_recipient: None, http_api: <_>::default(), http_metrics: <_>::default(), + beacon_node_fallback: <_>::default(), monitoring_api: None, enable_doppelganger_protection: false, enable_high_validator_count_metrics: false, @@ -258,6 +264,16 @@ impl Config { .collect::>()?; } + /* + * Beacon node fallback + */ + if let Some(sync_tolerance) = cli_args.get_one::("beacon-nodes-sync-tolerances") { + config.beacon_node_fallback.sync_tolerances = + BeaconNodeSyncDistanceTiers::from_str(sync_tolerance)?; + } else { + config.beacon_node_fallback.sync_tolerances = BeaconNodeSyncDistanceTiers::default(); + } + /* * Web3 signer */ diff --git a/validator_client/src/doppelganger_service.rs b/validator_client/src/doppelganger_service.rs index 2c8eca85601..1d552cc5ad9 100644 --- a/validator_client/src/doppelganger_service.rs +++ b/validator_client/src/doppelganger_service.rs @@ -29,9 +29,8 @@ //! //! Doppelganger protection is a best-effort, last-line-of-defence mitigation. Do not rely upon it. -use crate::beacon_node_fallback::{BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::BeaconNodeFallback; use crate::validator_store::ValidatorStore; -use crate::OfflineOnFailure; use environment::RuntimeContext; use eth2::types::LivenessResponseData; use parking_lot::RwLock; @@ -175,12 +174,11 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( } else { // Request the previous epoch liveness state from the beacon node. beacon_nodes - .first_success( - RequireSynced::Yes, - OfflineOnFailure::Yes, - |beacon_node| async { + .first_success(|beacon_node| { + let validator_indices_ref = &validator_indices; + async move { beacon_node - .post_validator_liveness_epoch(previous_epoch, &validator_indices) + .post_validator_liveness_epoch(previous_epoch, validator_indices_ref) .await .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) .map(|result| { @@ -194,8 +192,8 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( }) .collect() }) - }, - ) + } + }) .await .unwrap_or_else(|e| { crit!( @@ -212,12 +210,11 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( // Request the current epoch liveness state from the beacon node. let current_epoch_responses = beacon_nodes - .first_success( - RequireSynced::Yes, - OfflineOnFailure::Yes, - |beacon_node| async { + .first_success(|beacon_node| { + let validator_indices_ref = &validator_indices; + async move { beacon_node - .post_validator_liveness_epoch(current_epoch, &validator_indices) + .post_validator_liveness_epoch(current_epoch, validator_indices_ref) .await .map_err(|e| format!("Failed query for validator liveness: {:?}", e)) .map(|result| { @@ -231,8 +228,8 @@ async fn beacon_node_liveness<'a, T: 'static + SlotClock, E: EthSpec>( }) .collect() }) - }, - ) + } + }) .await .unwrap_or_else(|e| { crit!( diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 1c205b38e5d..cf8d4997920 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -8,7 +8,7 @@ pub mod sync; -use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, OfflineOnFailure, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use crate::http_metrics::metrics::{get_int_gauge, set_int_gauge, ATTESTATION_DUTY}; use crate::{ block_service::BlockServiceNotification, @@ -517,22 +517,18 @@ async fn poll_validator_indices( // Query the remote BN to resolve a pubkey to a validator index. let download_result = duties_service .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::VALIDATOR_ID_HTTP_GET], - ); - beacon_node - .get_beacon_states_validator_id( - StateId::Head, - &ValidatorId::PublicKey(pubkey), - ) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::VALIDATOR_ID_HTTP_GET], + ); + beacon_node + .get_beacon_states_validator_id( + StateId::Head, + &ValidatorId::PublicKey(pubkey), + ) + .await + }) .await; let fee_recipient = duties_service @@ -744,20 +740,15 @@ async fn poll_beacon_attesters( let subscriptions_ref = &subscriptions; let subscription_result = duties_service .beacon_nodes - .request( - RequireSynced::No, - OfflineOnFailure::Yes, - ApiTopic::Subscriptions, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::SUBSCRIPTIONS_HTTP_POST], - ); - beacon_node - .post_validator_beacon_committee_subscriptions(subscriptions_ref) - .await - }, - ) + .request(ApiTopic::Subscriptions, |beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::SUBSCRIPTIONS_HTTP_POST], + ); + beacon_node + .post_validator_beacon_committee_subscriptions(subscriptions_ref) + .await + }) .await; if subscription_result.as_ref().is_ok() { debug!( @@ -769,7 +760,7 @@ async fn poll_beacon_attesters( subscription_slots.record_successful_subscription_at(current_slot); } } else if let Err(e) = subscription_result { - if e.num_errors() < duties_service.beacon_nodes.num_total() { + if e.num_errors() < duties_service.beacon_nodes.num_total().await { warn!( log, "Some subscriptions failed"; @@ -1037,19 +1028,15 @@ async fn post_validator_duties_attester( ) -> Result>, Error> { duties_service .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::ATTESTER_DUTIES_HTTP_POST], - ); - beacon_node - .post_validator_duties_attester(epoch, validator_indices) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::ATTESTER_DUTIES_HTTP_POST], + ); + beacon_node + .post_validator_duties_attester(epoch, validator_indices) + .await + }) .await .map_err(|e| Error::FailedToDownloadAttesters(e.to_string())) } @@ -1273,19 +1260,15 @@ async fn poll_beacon_proposers( if !local_pubkeys.is_empty() { let download_result = duties_service .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let _timer = metrics::start_timer_vec( - &metrics::DUTIES_SERVICE_TIMES, - &[metrics::PROPOSER_DUTIES_HTTP_GET], - ); - beacon_node - .get_validator_duties_proposer(current_epoch) - .await - }, - ) + .first_success(|beacon_node| async move { + let _timer = metrics::start_timer_vec( + &metrics::DUTIES_SERVICE_TIMES, + &[metrics::PROPOSER_DUTIES_HTTP_GET], + ); + beacon_node + .get_validator_duties_proposer(current_epoch) + .await + }) .await; match download_result { diff --git a/validator_client/src/duties_service/sync.rs b/validator_client/src/duties_service/sync.rs index 3618b47146f..0bd99dc638b 100644 --- a/validator_client/src/duties_service/sync.rs +++ b/validator_client/src/duties_service/sync.rs @@ -1,4 +1,3 @@ -use crate::beacon_node_fallback::{OfflineOnFailure, RequireSynced}; use crate::{ doppelganger_service::DoppelgangerStatus, duties_service::{DutiesService, Error}, @@ -442,19 +441,15 @@ pub async fn poll_sync_committee_duties_for_period for Error { pub struct Context { pub task_executor: TaskExecutor, pub api_secret: ApiSecret, + pub block_service: Option>, pub validator_store: Option>>, pub validator_dir: Option, pub secrets_dir: Option, @@ -169,6 +171,17 @@ pub fn serve( } }; + let inner_block_service = ctx.block_service.clone(); + let block_service_filter = warp::any() + .map(move || inner_block_service.clone()) + .and_then(|block_service: Option<_>| async move { + block_service.ok_or_else(|| { + warp_utils::reject::custom_not_found( + "block service is not initialized.".to_string(), + ) + }) + }); + let inner_validator_store = ctx.validator_store.clone(); let validator_store_filter = warp::any() .map(move || inner_validator_store.clone()) @@ -398,6 +411,40 @@ pub fn serve( }, ); + // GET lighthouse/ui/fallback_health + let get_lighthouse_ui_fallback_health = warp::path("lighthouse") + .and(warp::path("ui")) + .and(warp::path("fallback_health")) + .and(warp::path::end()) + .and(block_service_filter.clone()) + .then(|block_filter: BlockService| async move { + let mut result: HashMap> = HashMap::new(); + + let mut beacon_nodes = Vec::new(); + for node in &*block_filter.beacon_nodes.candidates.read().await { + beacon_nodes.push(CandidateInfo { + index: node.index, + endpoint: node.beacon_node.to_string(), + health: *node.health.read().await, + }); + } + result.insert("beacon_nodes".to_string(), beacon_nodes); + + if let Some(proposer_nodes_list) = &block_filter.proposer_nodes { + let mut proposer_nodes = Vec::new(); + for node in &*proposer_nodes_list.candidates.read().await { + proposer_nodes.push(CandidateInfo { + index: node.index, + endpoint: node.beacon_node.to_string(), + health: *node.health.read().await, + }); + } + result.insert("proposer_nodes".to_string(), proposer_nodes); + } + + blocking_json_task(move || Ok(api_types::GenericResponse::from(result))).await + }); + // POST lighthouse/validators/ let post_validators = warp::path("lighthouse") .and(warp::path("validators")) @@ -1253,6 +1300,7 @@ pub fn serve( .or(get_lighthouse_validators_pubkey) .or(get_lighthouse_ui_health) .or(get_lighthouse_ui_graffiti) + .or(get_lighthouse_ui_fallback_health) .or(get_fee_recipient) .or(get_gas_limit) .or(get_graffiti) diff --git a/validator_client/src/http_api/test_utils.rs b/validator_client/src/http_api/test_utils.rs index 6c0e8b1617c..119c611553e 100644 --- a/validator_client/src/http_api/test_utils.rs +++ b/validator_client/src/http_api/test_utils.rs @@ -127,6 +127,7 @@ impl ApiTester { let context = Arc::new(Context { task_executor: test_runtime.task_executor.clone(), api_secret, + block_service: None, validator_dir: Some(validator_dir.path().into()), secrets_dir: Some(secrets_dir.path().into()), validator_store: Some(validator_store.clone()), diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index 98fbc854ae9..ba3b7f685b9 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -115,6 +115,7 @@ impl ApiTester { let context = Arc::new(Context { task_executor: test_runtime.task_executor.clone(), api_secret, + block_service: None, validator_dir: Some(validator_dir.path().into()), secrets_dir: Some(secrets_dir.path().into()), validator_store: Some(validator_store.clone()), diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index dff50582dfe..9a02ffdefb3 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -1,5 +1,6 @@ mod attestation_service; mod beacon_node_fallback; +mod beacon_node_health; mod block_service; mod check_synced; mod cli; @@ -20,6 +21,7 @@ pub mod initialized_validators; pub mod validator_store; pub use beacon_node_fallback::ApiTopic; +pub use beacon_node_health::BeaconNodeSyncDistanceTiers; pub use cli::cli_app; pub use config::Config; use initialized_validators::InitializedValidators; @@ -29,8 +31,7 @@ use sensitive_url::SensitiveUrl; pub use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; use crate::beacon_node_fallback::{ - start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, OfflineOnFailure, - RequireSynced, + start_fallback_updater_service, BeaconNodeFallback, CandidateBeaconNode, }; use crate::doppelganger_service::DoppelgangerService; use crate::graffiti_file::GraffitiFile; @@ -364,15 +365,21 @@ impl ProductionValidatorClient { .collect::, String>>()?; let num_nodes = beacon_nodes.len(); + // User order of `beacon_nodes` is preserved, so `index` corresponds to the position of + // the node in `--beacon_nodes`. let candidates = beacon_nodes .into_iter() - .map(CandidateBeaconNode::new) + .enumerate() + .map(|(index, node)| CandidateBeaconNode::new(node, index)) .collect(); let proposer_nodes_num = proposer_nodes.len(); + // User order of `proposer_nodes` is preserved, so `index` corresponds to the position of + // the node in `--proposer_nodes`. let proposer_candidates = proposer_nodes .into_iter() - .map(CandidateBeaconNode::new) + .enumerate() + .map(|(index, node)| CandidateBeaconNode::new(node, index)) .collect(); // Set the count for beacon node fallbacks excluding the primary beacon node. @@ -394,6 +401,7 @@ impl ProductionValidatorClient { let mut beacon_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new( candidates, + config.beacon_node_fallback, config.broadcast_topics.clone(), context.eth2_config.spec.clone(), log.clone(), @@ -401,6 +409,7 @@ impl ProductionValidatorClient { let mut proposer_nodes: BeaconNodeFallback<_, E> = BeaconNodeFallback::new( proposer_candidates, + config.beacon_node_fallback, config.broadcast_topics.clone(), context.eth2_config.spec.clone(), log.clone(), @@ -563,6 +572,7 @@ impl ProductionValidatorClient { let ctx = Arc::new(http_api::Context { task_executor: self.context.executor.clone(), api_secret, + block_service: Some(self.block_service.clone()), validator_store: Some(self.validator_store.clone()), validator_dir: Some(self.config.validator_dir.clone()), secrets_dir: Some(self.config.secrets_dir.clone()), @@ -655,10 +665,10 @@ async fn init_from_beacon_node( proposer_nodes.update_all_candidates().await; let num_available = beacon_nodes.num_available().await; - let num_total = beacon_nodes.num_total(); + let num_total = beacon_nodes.num_total().await; let proposer_available = proposer_nodes.num_available().await; - let proposer_total = proposer_nodes.num_total(); + let proposer_total = proposer_nodes.num_total().await; if proposer_total > 0 && proposer_available == 0 { warn!( @@ -704,11 +714,7 @@ async fn init_from_beacon_node( let genesis = loop { match beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |node| async move { node.get_beacon_genesis().await }, - ) + .first_success(|node| async move { node.get_beacon_genesis().await }) .await { Ok(genesis) => break genesis.data, @@ -795,11 +801,7 @@ async fn poll_whilst_waiting_for_genesis( ) -> Result<(), String> { loop { match beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { beacon_node.get_lighthouse_staking().await }, - ) + .first_success(|beacon_node| async move { beacon_node.get_lighthouse_staking().await }) .await { Ok(is_staking) => { diff --git a/validator_client/src/notifier.rs b/validator_client/src/notifier.rs index 819201978f8..00d7b14de7c 100644 --- a/validator_client/src/notifier.rs +++ b/validator_client/src/notifier.rs @@ -1,7 +1,7 @@ use crate::http_metrics; use crate::{DutiesService, ProductionValidatorClient}; use lighthouse_metrics::set_gauge; -use slog::{error, info, Logger}; +use slog::{debug, error, info, Logger}; use slot_clock::SlotClock; use tokio::time::{sleep, Duration}; use types::EthSpec; @@ -39,25 +39,32 @@ async fn notify( duties_service: &DutiesService, log: &Logger, ) { - let num_available = duties_service.beacon_nodes.num_available().await; + let (candidate_info, num_available, num_synced) = + duties_service.beacon_nodes.get_notifier_info().await; + let num_total = candidate_info.len(); + let num_synced_fallback = num_synced.saturating_sub(1); + set_gauge( &http_metrics::metrics::AVAILABLE_BEACON_NODES_COUNT, num_available as i64, ); - let num_synced = duties_service.beacon_nodes.num_synced().await; set_gauge( &http_metrics::metrics::SYNCED_BEACON_NODES_COUNT, num_synced as i64, ); - let num_total = duties_service.beacon_nodes.num_total(); set_gauge( &http_metrics::metrics::TOTAL_BEACON_NODES_COUNT, num_total as i64, ); if num_synced > 0 { + let primary = candidate_info + .first() + .map(|candidate| candidate.endpoint.as_str()) + .unwrap_or("None"); info!( log, "Connected to beacon node(s)"; + "primary" => primary, "total" => num_total, "available" => num_available, "synced" => num_synced, @@ -71,13 +78,36 @@ async fn notify( "synced" => num_synced, ) } - let num_synced_fallback = duties_service.beacon_nodes.num_synced_fallback().await; if num_synced_fallback > 0 { set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 1); } else { set_gauge(&http_metrics::metrics::ETH2_FALLBACK_CONNECTED, 0); } + for info in candidate_info { + if let Ok(health) = info.health { + debug!( + log, + "Beacon node info"; + "status" => "Connected", + "index" => info.index, + "endpoint" => info.endpoint, + "head_slot" => %health.head, + "is_optimistic" => ?health.optimistic_status, + "execution_engine_status" => ?health.execution_status, + "health_tier" => %health.health_tier, + ); + } else { + debug!( + log, + "Beacon node info"; + "status" => "Disconnected", + "index" => info.index, + "endpoint" => info.endpoint, + ); + } + } + if let Some(slot) = duties_service.slot_clock.now() { let epoch = slot.epoch(E::slots_per_epoch()); diff --git a/validator_client/src/preparation_service.rs b/validator_client/src/preparation_service.rs index 474f9f47609..010c651c25d 100644 --- a/validator_client/src/preparation_service.rs +++ b/validator_client/src/preparation_service.rs @@ -1,6 +1,5 @@ -use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use crate::validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}; -use crate::OfflineOnFailure; use bls::PublicKeyBytes; use environment::RuntimeContext; use parking_lot::RwLock; @@ -342,16 +341,11 @@ impl PreparationService { let preparation_entries = preparation_data.as_slice(); match self .beacon_nodes - .request( - RequireSynced::No, - OfflineOnFailure::Yes, - ApiTopic::Subscriptions, - |beacon_node| async move { - beacon_node - .post_validator_prepare_beacon_proposer(preparation_entries) - .await - }, - ) + .request(ApiTopic::Subscriptions, |beacon_node| async move { + beacon_node + .post_validator_prepare_beacon_proposer(preparation_entries) + .await + }) .await { Ok(()) => debug!( @@ -477,13 +471,9 @@ impl PreparationService { for batch in signed.chunks(self.validator_registration_batch_size) { match self .beacon_nodes - .broadcast( - RequireSynced::No, - OfflineOnFailure::No, - |beacon_node| async move { - beacon_node.post_validator_register_validator(batch).await - }, - ) + .broadcast(|beacon_node| async move { + beacon_node.post_validator_register_validator(batch).await + }) .await { Ok(()) => info!( diff --git a/validator_client/src/sync_committee_service.rs b/validator_client/src/sync_committee_service.rs index f7abb3855a3..5c02998e3fc 100644 --- a/validator_client/src/sync_committee_service.rs +++ b/validator_client/src/sync_committee_service.rs @@ -1,8 +1,7 @@ -use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback, RequireSynced}; +use crate::beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use crate::{ duties_service::DutiesService, validator_store::{Error as ValidatorStoreError, ValidatorStore}, - OfflineOnFailure, }; use environment::RuntimeContext; use eth2::types::BlockId; @@ -180,8 +179,6 @@ impl SyncCommitteeService { let response = self .beacon_nodes .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, |beacon_node| async move { match beacon_node.get_beacon_blocks_root(BlockId::Head).await { Ok(Some(block)) if block.execution_optimistic == Some(false) => { @@ -299,16 +296,11 @@ impl SyncCommitteeService { .collect::>(); self.beacon_nodes - .request( - RequireSynced::No, - OfflineOnFailure::Yes, - ApiTopic::SyncCommittee, - |beacon_node| async move { - beacon_node - .post_beacon_pool_sync_committee_signatures(committee_signatures) - .await - }, - ) + .request(ApiTopic::SyncCommittee, |beacon_node| async move { + beacon_node + .post_beacon_pool_sync_committee_signatures(committee_signatures) + .await + }) .await .map_err(|e| { error!( @@ -371,21 +363,17 @@ impl SyncCommitteeService { let contribution = &self .beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - let sync_contribution_data = SyncContributionData { - slot, - beacon_block_root, - subcommittee_index: subnet_id.into(), - }; + .first_success(|beacon_node| async move { + let sync_contribution_data = SyncContributionData { + slot, + beacon_block_root, + subcommittee_index: subnet_id.into(), + }; - beacon_node - .get_validator_sync_committee_contribution::(&sync_contribution_data) - .await - }, - ) + beacon_node + .get_validator_sync_committee_contribution::(&sync_contribution_data) + .await + }) .await .map_err(|e| { crit!( @@ -453,15 +441,11 @@ impl SyncCommitteeService { // Publish to the beacon node. self.beacon_nodes - .first_success( - RequireSynced::No, - OfflineOnFailure::Yes, - |beacon_node| async move { - beacon_node - .post_validator_contribution_and_proofs(signed_contributions) - .await - }, - ) + .first_success(|beacon_node| async move { + beacon_node + .post_validator_contribution_and_proofs(signed_contributions) + .await + }) .await .map_err(|e| { error!( @@ -595,16 +579,11 @@ impl SyncCommitteeService { if let Err(e) = self .beacon_nodes - .request( - RequireSynced::No, - OfflineOnFailure::Yes, - ApiTopic::Subscriptions, - |beacon_node| async move { - beacon_node - .post_validator_sync_committee_subscriptions(subscriptions_slice) - .await - }, - ) + .request(ApiTopic::Subscriptions, |beacon_node| async move { + beacon_node + .post_validator_sync_committee_subscriptions(subscriptions_slice) + .await + }) .await { error!(