Skip to content

Commit

Permalink
fix(hermes): tune readiness probe (#1753)
Browse files Browse the repository at this point in the history
* fix(hermes): tune readiness probe

This change modifies the not_behind readiness indicator to use the
most recent slot from Pythnet accumulator instead of taking the max
because we think that during forks validators might return an absurdly
large slot. It also makes it more verbose by returning the indicators
and the initial values for them so we can investigate better if the
issue persists.

* fix: use saturating sub to avoid overflow issue
  • Loading branch information
ali-bahjati authored Jul 4, 2024
1 parent 7ef9aa2 commit 29afb2a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 23 deletions.
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.5.14"
version = "0.5.15"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
5 changes: 3 additions & 2 deletions apps/hermes/server/src/api/rest/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
IntoResponse,
Response,
},
Json,
},
};

Expand All @@ -19,7 +20,7 @@ where
{
let state = &*state.state;
match Aggregates::is_ready(state).await {
true => (StatusCode::OK, "OK").into_response(),
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
(true, _) => (StatusCode::OK, "OK").into_response(),
(false, metadata) => (StatusCode::SERVICE_UNAVAILABLE, Json(metadata)).into_response(),
}
}
70 changes: 51 additions & 19 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
#[cfg(test)]
use mock_instant::{
Instant,
SystemTime,
UNIX_EPOCH,
};
#[cfg(not(test))]
use std::time::{
Instant,
SystemTime,
UNIX_EPOCH,
};
Expand Down Expand Up @@ -56,6 +54,7 @@ use {
},
},
},
serde::Serialize,
std::{
collections::HashSet,
time::Duration,
Expand Down Expand Up @@ -116,10 +115,13 @@ pub struct AggregateStateData {
pub latest_completed_slot: Option<Slot>,

/// Time of the latest completed update. This is used for the health probes.
pub latest_completed_update_at: Option<Instant>,
pub latest_completed_update_time: Option<SystemTime>,

/// The latest observed slot among different Aggregate updates. This is used for the health
/// probes.
/// probes. The slot is not necessarily the maximum observed slot but it should be close
/// to the maximum. The maximum observed slot is not used because sometimes due to some
/// network issues we might receive an update with a much higher slot specially during
/// the forks.
pub latest_observed_slot: Option<Slot>,

/// The duration of no aggregation after which the readiness of the state is considered stale.
Expand All @@ -140,7 +142,7 @@ impl AggregateStateData {
) -> Self {
Self {
latest_completed_slot: None,
latest_completed_update_at: None,
latest_completed_update_time: None,
latest_observed_slot: None,
metrics: metrics::Metrics::new(metrics_registry),
readiness_staleness_threshold,
Expand Down Expand Up @@ -213,6 +215,17 @@ pub struct PriceFeedsWithUpdateData {
pub update_data: Vec<Vec<u8>>,
}

#[derive(Debug, Serialize)]
pub struct ReadinessMetadata {
pub has_completed_recently: bool,
pub is_not_behind: bool,
pub is_metadata_loaded: bool,
pub latest_completed_slot: Option<Slot>,
pub latest_observed_slot: Option<Slot>,
pub latest_completed_unix_timestamp: Option<UnixTimestamp>,
pub price_feeds_metadata_len: usize,
}

#[async_trait::async_trait]
pub trait Aggregates
where
Expand All @@ -221,7 +234,7 @@ where
Self: PriceFeedMeta,
{
fn subscribe(&self) -> Receiver<AggregationEvent>;
async fn is_ready(&self) -> bool;
async fn is_ready(&self) -> (bool, ReadinessMetadata);
async fn store_update(&self, update: Update) -> Result<()>;
async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier>;
async fn get_price_feeds_with_update_data(
Expand Down Expand Up @@ -304,10 +317,7 @@ where
// Update the aggregate state with the latest observed slot
{
let mut aggregate_state = self.into().data.write().await;
aggregate_state.latest_observed_slot = aggregate_state
.latest_observed_slot
.map(|latest| latest.max(slot))
.or(Some(slot));
aggregate_state.latest_observed_slot = Some(slot);
}

let accumulator_messages = self.fetch_accumulator_messages(slot).await?;
Expand Down Expand Up @@ -366,8 +376,8 @@ where
.or(Some(slot));

aggregate_state
.latest_completed_update_at
.replace(Instant::now());
.latest_completed_update_time
.replace(SystemTime::now());

aggregate_state
.metrics
Expand Down Expand Up @@ -401,15 +411,20 @@ where
.collect()
}

async fn is_ready(&self) -> bool {
async fn is_ready(&self) -> (bool, ReadinessMetadata) {
let state_data = self.into().data.read().await;
let price_feeds_metadata = PriceFeedMeta::retrieve_price_feeds_metadata(self)
.await
.unwrap();

let has_completed_recently = match state_data.latest_completed_update_at.as_ref() {
let current_time = SystemTime::now();

let has_completed_recently = match state_data.latest_completed_update_time {
Some(latest_completed_update_time) => {
latest_completed_update_time.elapsed() < state_data.readiness_staleness_threshold
current_time
.duration_since(latest_completed_update_time)
.unwrap_or(Duration::from_secs(0))
< state_data.readiness_staleness_threshold
}
None => false,
};
Expand All @@ -419,14 +434,31 @@ where
state_data.latest_observed_slot,
) {
(Some(latest_completed_slot), Some(latest_observed_slot)) => {
latest_observed_slot - latest_completed_slot
latest_observed_slot.saturating_sub(latest_completed_slot)
<= state_data.readiness_max_allowed_slot_lag
}
_ => false,
};

let is_metadata_loaded = !price_feeds_metadata.is_empty();
has_completed_recently && is_not_behind && is_metadata_loaded
(
has_completed_recently && is_not_behind && is_metadata_loaded,
ReadinessMetadata {
has_completed_recently,
is_not_behind,
is_metadata_loaded,
latest_completed_slot: state_data.latest_completed_slot,
latest_observed_slot: state_data.latest_observed_slot,
latest_completed_unix_timestamp: state_data.latest_completed_update_time.and_then(
|t| {
t.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.ok()
},
),
price_feeds_metadata_len: price_feeds_metadata.len(),
},
)
}
}

Expand Down Expand Up @@ -896,14 +928,14 @@ mod test {
.unwrap();

// Check the state is ready
assert!(state.is_ready().await);
assert!(state.is_ready().await.0);

// Advance the clock to make the prices stale
let staleness_threshold = Duration::from_secs(30);
MockClock::advance_system_time(staleness_threshold);
MockClock::advance(staleness_threshold);
// Check the state is not ready
assert!(!state.is_ready().await);
assert!(!state.is_ready().await.0);
}

/// Test that the state retains the latest slots upon cache eviction.
Expand Down

0 comments on commit 29afb2a

Please sign in to comment.