From 591ffc0a4814cfd2323660164ec6ea3f5ba8f421 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Tue, 2 Jul 2024 13:09:15 +0200 Subject: [PATCH] fix(hermes): tune readiness probe This change modifies the not_behind readiness indicator to use the most recent slot from Pythnet accumulator instead of taking the max because we think that during forks validators might return an absurdly large slot. It also makes it more verbose by returning the indicators and the initial values for them so we can investigate better if the issue persists. --- apps/hermes/server/Cargo.lock | 2 +- apps/hermes/server/Cargo.toml | 2 +- apps/hermes/server/src/api/rest/ready.rs | 5 +- apps/hermes/server/src/state/aggregate.rs | 68 +++++++++++++++++------ 4 files changed, 55 insertions(+), 22 deletions(-) diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index ee91040047..3d8446876a 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1796,7 +1796,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.5.14" +version = "0.5.15" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index 1892eb0a58..d2b5474acf 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.5.14" +version = "0.5.15" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/api/rest/ready.rs b/apps/hermes/server/src/api/rest/ready.rs index 255b5d87ce..87a57d8dc8 100644 --- a/apps/hermes/server/src/api/rest/ready.rs +++ b/apps/hermes/server/src/api/rest/ready.rs @@ -10,6 +10,7 @@ use { IntoResponse, Response, }, + Json, }, }; @@ -19,7 +20,7 @@ where { let state = &*state.state; match Aggregates::is_ready(state).await { - true => (StatusCode::OK, "OK").into_response(), - false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(), + (true, _) => (StatusCode::OK, "OK").into_response(), + (false, metadata) => (StatusCode::SERVICE_UNAVAILABLE, Json(metadata)).into_response(), } } diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index 14bfb2314e..a8b365e062 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -1,12 +1,10 @@ #[cfg(test)] use mock_instant::{ - Instant, SystemTime, UNIX_EPOCH, }; #[cfg(not(test))] use std::time::{ - Instant, SystemTime, UNIX_EPOCH, }; @@ -56,6 +54,7 @@ use { }, }, }, + serde::Serialize, std::{ collections::HashSet, time::Duration, @@ -116,10 +115,13 @@ pub struct AggregateStateData { pub latest_completed_slot: Option, /// Time of the latest completed update. This is used for the health probes. - pub latest_completed_update_at: Option, + pub latest_completed_update_time: Option, /// The latest observed slot among different Aggregate updates. This is used for the health - /// probes. + /// probes. The slot is not necessarily the maximum observed slot but it should be close + /// to the maximum. The maximum observed slot is not used because sometimes due to some + /// network issues we might receive an update with a much higher slot specially during + /// the forks. pub latest_observed_slot: Option, /// The duration of no aggregation after which the readiness of the state is considered stale. @@ -140,7 +142,7 @@ impl AggregateStateData { ) -> Self { Self { latest_completed_slot: None, - latest_completed_update_at: None, + latest_completed_update_time: None, latest_observed_slot: None, metrics: metrics::Metrics::new(metrics_registry), readiness_staleness_threshold, @@ -213,6 +215,17 @@ pub struct PriceFeedsWithUpdateData { pub update_data: Vec>, } +#[derive(Debug, Serialize)] +pub struct ReadinessMetadata { + pub has_completed_recently: bool, + pub is_not_behind: bool, + pub is_metadata_loaded: bool, + pub latest_completed_slot: Option, + pub latest_observed_slot: Option, + pub latest_completed_unix_timestamp: Option, + pub price_feeds_metadata_len: usize, +} + #[async_trait::async_trait] pub trait Aggregates where @@ -221,7 +234,7 @@ where Self: PriceFeedMeta, { fn subscribe(&self) -> Receiver; - async fn is_ready(&self) -> bool; + async fn is_ready(&self) -> (bool, ReadinessMetadata); async fn store_update(&self, update: Update) -> Result<()>; async fn get_price_feed_ids(&self) -> HashSet; async fn get_price_feeds_with_update_data( @@ -304,10 +317,7 @@ where // Update the aggregate state with the latest observed slot { let mut aggregate_state = self.into().data.write().await; - aggregate_state.latest_observed_slot = aggregate_state - .latest_observed_slot - .map(|latest| latest.max(slot)) - .or(Some(slot)); + aggregate_state.latest_observed_slot = Some(slot); } let accumulator_messages = self.fetch_accumulator_messages(slot).await?; @@ -366,8 +376,8 @@ where .or(Some(slot)); aggregate_state - .latest_completed_update_at - .replace(Instant::now()); + .latest_completed_update_time + .replace(SystemTime::now()); aggregate_state .metrics @@ -401,15 +411,20 @@ where .collect() } - async fn is_ready(&self) -> bool { + async fn is_ready(&self) -> (bool, ReadinessMetadata) { let state_data = self.into().data.read().await; let price_feeds_metadata = PriceFeedMeta::retrieve_price_feeds_metadata(self) .await .unwrap(); - let has_completed_recently = match state_data.latest_completed_update_at.as_ref() { + let current_time = SystemTime::now(); + + let has_completed_recently = match state_data.latest_completed_update_time { Some(latest_completed_update_time) => { - latest_completed_update_time.elapsed() < state_data.readiness_staleness_threshold + current_time + .duration_since(latest_completed_update_time) + .unwrap_or(Duration::from_secs(0)) + < state_data.readiness_staleness_threshold } None => false, }; @@ -426,7 +441,24 @@ where }; let is_metadata_loaded = !price_feeds_metadata.is_empty(); - has_completed_recently && is_not_behind && is_metadata_loaded + ( + has_completed_recently && is_not_behind && is_metadata_loaded, + ReadinessMetadata { + has_completed_recently, + is_not_behind, + is_metadata_loaded, + latest_completed_slot: state_data.latest_completed_slot, + latest_observed_slot: state_data.latest_observed_slot, + latest_completed_unix_timestamp: state_data.latest_completed_update_time.and_then( + |t| { + t.duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() as i64) + .ok() + }, + ), + price_feeds_metadata_len: price_feeds_metadata.len(), + }, + ) } } @@ -896,14 +928,14 @@ mod test { .unwrap(); // Check the state is ready - assert!(state.is_ready().await); + assert!(state.is_ready().await.0); // Advance the clock to make the prices stale let staleness_threshold = Duration::from_secs(30); MockClock::advance_system_time(staleness_threshold); MockClock::advance(staleness_threshold); // Check the state is not ready - assert!(!state.is_ready().await); + assert!(!state.is_ready().await.0); } /// Test that the state retains the latest slots upon cache eviction.